hyperloglockless/
lib.rs

1#![allow(rustdoc::bare_urls)]
2#![doc = include_str!("../README.md")]
3#![cfg_attr(not(feature = "std"), no_std)]
4
5extern crate alloc;
6use alloc::{boxed::Box, vec::Vec};
7use core::hash::{BuildHasher, Hash, Hasher};
8use core::iter::repeat;
9use core::sync::atomic::Ordering::Relaxed;
10
11#[cfg(feature = "loom")]
12pub(crate) use loom::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize};
13
14#[cfg(not(feature = "loom"))]
15pub(crate) use core::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize};
16
17#[cfg(all(feature = "loom", feature = "serde"))]
18compile_error!("features `loom` and `serde` are mutually exclusive");
19
20mod atomic_f64;
21use atomic_f64::AtomicF64;
22mod beta;
23use beta::beta_horner;
24mod hasher;
25pub use hasher::DefaultHasher;
26mod error;
27pub use error::Error;
28mod math;
29use math::*;
30
31/// HyperLogLog is a data structure for the "count-distinct problem", approximating the number of distinct elements in a multiset.
32///
33/// # Example
34/// ```rust
35/// use hyperloglockless::HyperLogLog;
36///
37/// let mut hll = HyperLogLog::new(16);
38/// hll.insert("42");
39/// hll.insert("🦀");
40///
41/// let count = hll.count();
42/// ```
43#[derive(Debug, Clone)]
44#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
45pub struct HyperLogLog<S = DefaultHasher> {
46    /// `registers[k]` is the maximum trailing zeros for all 64-bit hashes assigned to kth register
47    registers: Box<[u8]>,
48    /// `registers.len() == 1 << precision`
49    precision: u32,
50    hasher: S,
51    zeros: usize,
52    sum: f64,
53    correction: f64,
54}
55
56/// HyperLogLog is a data structure for the "count-distinct problem", approximating the number of distinct elements in a multiset.
57/// [`AtomicHyperLogLog`] is the thread-safe counterpart of [`HyperLogLog`].
58///
59/// # Example
60/// ```rust
61/// use hyperloglockless::AtomicHyperLogLog;
62///
63/// let hll = AtomicHyperLogLog::new(16);
64/// hll.insert("42");
65/// hll.insert("🦀");
66///
67/// let count = hll.count();
68/// ```
69#[derive(Debug)]
70#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
71pub struct AtomicHyperLogLog<S = DefaultHasher> {
72    /// `registers[k]` is the maximum trailing zeros for all 64-bit hashes assigned to kth register
73    registers: Box<[AtomicU8]>,
74    /// `registers.len() == 1 << precision`
75    precision: u32,
76    hasher: S,
77    zeros: AtomicUsize,
78    sum: AtomicF64,
79    correction: f64,
80}
81
82impl<S: BuildHasher> HyperLogLog<S> {
83    /// Returns a new `HyperLogLog` with `1 << precision` registers (1 byte each)
84    /// using the provided hasher.
85    pub fn with_hasher(precision: u8, hasher: S) -> Self {
86        validate_precision(precision);
87        let num_registers = 1 << precision;
88        let data: Vec<_> = repeat(0).take(num_registers).collect();
89        Self {
90            hasher,
91            precision: precision as u32,
92            zeros: data.len(),
93            correction: correction(data.len()),
94            registers: data.into(),
95            sum: f64::from(num_registers as u32),
96        }
97    }
98}
99
100impl<S: BuildHasher> AtomicHyperLogLog<S> {
101    /// Returns a new `AtomicHyperLogLog` with `1 << precision` registers (1 byte each)
102    /// using the provided hasher.
103    pub fn with_hasher(precision: u8, hasher: S) -> Self {
104        validate_precision(precision);
105        let num_registers = 1 << precision;
106        let data: Vec<_> = repeat(0).take(num_registers).map(AtomicU8::new).collect();
107        Self {
108            hasher,
109            precision: precision as u32,
110            zeros: AtomicUsize::new(data.len()),
111            correction: correction(data.len()),
112            registers: data.into(),
113            sum: AtomicF64::new(f64::from(num_registers as u32)),
114        }
115    }
116}
117
118macro_rules! impl_new {
119    ($name:ident) => {
120        impl $name {
121            /// Returns a new [`Self`] with `1 << precision` registers (1 byte each)
122            /// using the default hasher with a random seed.
123            pub fn new(precision: u8) -> $name<DefaultHasher> {
124                $name::with_hasher(precision, DefaultHasher::default())
125            }
126
127            /// Returns a new [`Self`] with `1 << precision` registers (1 byte each)
128            /// using the default hasher seeded with `seed`.
129            pub fn seeded(precision: u8, seed: u128) -> $name<DefaultHasher> {
130                $name::with_hasher(precision, DefaultHasher::seeded(&seed.to_be_bytes()))
131            }
132        }
133    };
134}
135
136impl_new!(HyperLogLog);
137impl_new!(AtomicHyperLogLog);
138
139macro_rules! impl_common {
140    ($name:ident) => {
141        impl<S: BuildHasher> $name<S> {
142            /// Returns the number registers in `self`.
143            #[inline(always)]
144            pub fn len(&self) -> usize {
145                self.registers.len()
146            }
147
148            /// Returns the approximate number of elements in `self`.
149            #[inline]
150            pub fn count(&self) -> usize {
151                self.raw_count() as usize
152            }
153
154            #[inline(always)]
155            fn raw_count_inner(&self, zeros: usize, sum: f64) -> f64 {
156                let d = sum + beta_horner(zeros, self.precision);
157                self.correction * (self.len() * (self.len() - zeros)) as f64 / d
158            }
159        }
160
161        impl<T: Hash, S: BuildHasher> Extend<T> for $name<S> {
162            #[inline]
163            fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
164                self.insert_all(iter)
165            }
166        }
167
168        impl<S: BuildHasher> PartialEq for $name<S> {
169            fn eq(&self, other: &Self) -> bool {
170                if self.len() != other.len() {
171                    return false;
172                }
173                core::iter::zip(self.iter(), other.iter()).all(|(l, r)| l == r)
174            }
175        }
176        impl<S: BuildHasher> Eq for $name<S> {}
177    };
178}
179
180impl_common!(HyperLogLog);
181impl_common!(AtomicHyperLogLog);
182
183impl<S: BuildHasher> HyperLogLog<S> {
184    /// Returns an iterator over the value of each register.
185    #[inline]
186    pub fn iter(&self) -> impl Iterator<Item = u8> + '_ {
187        self.registers.iter().map(|x| *x)
188    }
189
190    /// Inserts the item into the HyperLogLog.
191    #[inline(always)]
192    pub fn insert<T: Hash + ?Sized>(&mut self, value: &T) {
193        let mut hasher = self.hasher.build_hasher();
194        value.hash(&mut hasher);
195        self.insert_hash(hasher.finish());
196    }
197
198    /// Inserts the hash of an item into the HyperLogLog.
199    #[inline(always)]
200    pub fn insert_hash(&mut self, hash: u64) {
201        let index = hash >> (64 - self.precision);
202        let new = 1 + hash.trailing_zeros() as u8;
203        self.update(new, index as usize);
204    }
205
206    #[inline(always)]
207    fn update(&mut self, new: u8, index: usize) {
208        let old = self.registers[index];
209        self.registers[index] = core::cmp::max(self.registers[index], new);
210        if old == 0 {
211            self.zeros -= 1;
212        }
213        if old < new {
214            let diff = harmonic_term(old) - harmonic_term(new);
215            self.sum -= diff;
216        }
217    }
218
219    /// Inserts all the items in `iter` into the `self`.
220    #[inline]
221    pub fn insert_all<T: Hash, I: IntoIterator<Item = T>>(&mut self, iter: I) {
222        for val in iter {
223            self.insert(&val);
224        }
225    }
226
227    /// Merges another HyperLogLog into `self`, updating the count.
228    /// Returns `Err(Error::IncompatibleLength)` if the two HyperLogLogs have
229    /// different length ([`Self::len`]).
230    #[inline(always)]
231    pub fn merge(&mut self, other: &Self) -> Result<(), Error> {
232        if self.len() != other.len() {
233            return Err(Error::IncompatibleLength);
234        }
235
236        // TODO? if self.hasher != other.hasher { ... }
237
238        for (i, x) in other.iter().enumerate() {
239            self.update(x, i);
240        }
241
242        Ok(())
243    }
244
245    /// Returns the approximate number of elements in `self`.
246    #[inline]
247    pub fn raw_count(&self) -> f64 {
248        let zeros = self.zeros;
249        let sum = self.sum;
250        self.raw_count_inner(zeros, sum)
251    }
252}
253
254impl<S: BuildHasher> AtomicHyperLogLog<S> {
255    /// Returns an iterator over the value of each register.
256    #[inline]
257    pub fn iter(&self) -> impl Iterator<Item = u8> + '_ {
258        self.registers.iter().map(|x| x.load(Relaxed))
259    }
260
261    /// Inserts the item into the HyperLogLog.
262    #[inline(always)]
263    pub fn insert<T: Hash + ?Sized>(&self, value: &T) {
264        let mut hasher = self.hasher.build_hasher();
265        value.hash(&mut hasher);
266        self.insert_hash(hasher.finish());
267    }
268
269    /// Inserts the hash of an item into the HyperLogLog.
270    #[inline(always)]
271    pub fn insert_hash(&self, hash: u64) {
272        let index = hash >> (64 - self.precision);
273        let new = 1 + hash.trailing_zeros() as u8;
274        self.update(new, index as usize);
275    }
276
277    #[inline(always)]
278    fn update(&self, new: u8, index: usize) {
279        let old = self.registers[index].fetch_max(new, Relaxed);
280        if old == 0 {
281            self.zeros.fetch_sub(1, Relaxed);
282        }
283        if old < new {
284            let diff = harmonic_term(old) - harmonic_term(new);
285            self.sum.fetch_sub(diff, Relaxed);
286        }
287    }
288
289    /// Inserts all the items in `iter` into the `self`. Immutable version of [`Self::extend`].
290    #[inline]
291    pub fn insert_all<T: Hash, I: IntoIterator<Item = T>>(&self, iter: I) {
292        for val in iter {
293            self.insert(&val);
294        }
295    }
296
297    /// Merges another HyperLogLog into `self`, updating the count.
298    /// Returns `Err(Error::IncompatibleLength)` if the two HyperLogLogs have
299    /// different length ([`Self::len`]).
300    #[inline(always)]
301    pub fn merge(&self, other: &Self) -> Result<(), Error> {
302        if self.len() != other.len() {
303            return Err(Error::IncompatibleLength);
304        }
305
306        // TODO? if self.hasher != other.hasher { ... }
307
308        for (i, x) in other.iter().enumerate() {
309            self.update(x, i);
310        }
311
312        Ok(())
313    }
314
315    /// Returns the approximate number of elements in `self`.
316    #[inline]
317    pub fn raw_count(&self) -> f64 {
318        let zeros = self.zeros.load(Relaxed);
319        let sum = self.sum.load(Relaxed);
320        self.raw_count_inner(zeros, sum)
321    }
322}
323
324impl<S: BuildHasher + Clone> Clone for AtomicHyperLogLog<S> {
325    fn clone(&self) -> Self {
326        Self {
327            hasher: self.hasher.clone(),
328            precision: self.precision,
329            zeros: AtomicUsize::new(self.zeros.load(Relaxed)),
330            correction: self.correction,
331            registers: self.iter().map(AtomicU8::new).collect::<Vec<_>>().into(),
332            sum: AtomicF64::new(self.sum.load(Relaxed)),
333        }
334    }
335}
336
337#[inline]
338fn validate_precision(precision: u8) {
339    assert!(
340        (4..=18).contains(&precision),
341        "Precisions 4..=18 supported only."
342    );
343}
344
345/// Returns `1.0 / ((1 << x) as f64)`.
346#[inline(always)]
347fn harmonic_term(x: u8) -> f64 {
348    f64::from_bits(u64::MAX.wrapping_sub(u64::from(x)) << 54 >> 2)
349}
350
351/// Returns the HyperLogLog precision that will have the error for calls to `count` and `raw_count`.
352#[inline]
353pub fn precision_for_error(error: f64) -> u8 {
354    assert!(0.0 < error && error < 1.0);
355    let bias_constant = 1.0389617614136892; // (3.0 * 2.0f64.ln() - 1.0).sqrt();
356    ceil(log2(pow(bias_constant / error, 2.0))) as u8
357}
358
359/// Returns the approximate error of `count` and `raw_count` given the precision of a [`HyperLogLog`] or [`AtomicHyperLogLog`].
360#[inline]
361pub fn error_for_precision(precision: u8) -> f64 {
362    validate_precision(precision);
363    let bias_constant = 1.0389617614136892; // (3.0 * 2.0f64.ln() - 1.0).sqrt();
364    bias_constant / sqrt((1u64 << precision) as f64)
365}
366
367#[inline(always)]
368fn correction(count: usize) -> f64 {
369    // Hardcoded since the result of f64::ln varies by platform
370    let base = 0.7213475204444817; // 1.0 / (2.0 * 2.0f64.ln());
371    let approx = 1.0794415416798357; // 3.0 * 2.0f64.ln() - 1.0;
372    match count {
373        16 => 0.673,
374        32 => 0.697,
375        64 => 0.709,
376        _ => base / (1.0 + approx / count as f64),
377    }
378}
379
380macro_rules! impl_tests {
381    ($modname:ident, $name:ident) => {
382        #[allow(unused_mut)]
383        #[cfg(not(feature = "loom"))]
384        #[cfg(test)]
385        mod $modname {
386            use super::*;
387            #[test]
388            fn test_clone() {
389                let mut hll = $name::seeded(4, 42);
390                hll.insert_all(1..10);
391                let mut cloned = hll.clone();
392                assert_eq!(hll, cloned);
393                cloned.insert(&42);
394                assert!(hll != cloned);
395            }
396
397            #[test]
398            fn test_low_error() {
399                for (p, thresh) in [(8, 0.15), (9, 0.15), (10, 0.15)] {
400                    low_error(p, thresh);
401                }
402            }
403
404            fn low_error(precision: u8, thresh: f64) {
405                let mut hll = $name::seeded(precision, 42);
406                let mut counted = 0;
407                let mut total_err = 0f64;
408                let mut total_diff = 0f64;
409
410                for x in 1..10_000_000 {
411                    hll.insert(&x);
412                    if x % 1_00_000 == 0 {
413                        let real = x as f64;
414                        let diff = hll.raw_count() - real;
415                        let err = diff.abs() / real;
416                        assert!(err < thresh, "{}", err);
417
418                        counted += 1;
419                        total_err += err;
420                        total_diff += diff / real;
421                    }
422                }
423
424                assert!((total_err - total_diff).abs() / counted as f64 > 0.01);
425            }
426
427            #[test]
428            fn test_merge() {
429                let mut left = $name::seeded(8, 42);
430                let mut right = $name::seeded(8, 42);
431
432                for x in 1..2000 {
433                    left.insert(&x);
434                }
435                for x in 1000..3000 {
436                    right.insert(&x);
437                }
438
439                left.merge(&right).unwrap();
440
441                let real = 3000 as f64;
442                let my_acc = (real - (left.count() as f64 - real).abs()) / real;
443                assert!(my_acc > 0.75, "{}", my_acc);
444            }
445
446            #[cfg(feature = "serde")]
447            #[test]
448            fn test_serde() {
449                for precision in 4..=18 {
450                    let mut before = $name::seeded(precision, 42);
451                    before.extend(0..=1000);
452
453                    let s = serde_cbor::to_vec(&before).unwrap();
454                    let mut after: $name = serde_cbor::from_slice(&s).unwrap();
455                    assert_eq!(before, after);
456
457                    before.extend(1000..=2000);
458                    after.extend(1000..=2000);
459                    assert_eq!(before, after);
460                }
461            }
462
463            #[test]
464            fn test_error_helpers() {
465                for precision in 4..=18 {
466                    let err = error_for_precision(precision);
467                    let prec = precision_for_error(err);
468                    assert_eq!(prec, precision);
469                }
470            }
471        }
472    };
473}
474
475impl_tests!(non_atomic, HyperLogLog);
476impl_tests!(atomic, AtomicHyperLogLog);
477
478#[cfg(feature = "loom")]
479#[cfg(test)]
480mod loom_tests {
481    use super::*;
482    use core::sync::atomic::Ordering;
483
484    #[test]
485    fn test_loom() {
486        loom::model(|| {
487            let hll = loom::sync::Arc::new(AtomicHyperLogLog::seeded(4, 42));
488            let expected = AtomicHyperLogLog::seeded(4, 42);
489            expected.insert_all(1..=4);
490            let handles: Vec<_> = [(1..=2), (2..=4)]
491                .into_iter()
492                .map(|data| {
493                    let v = hll.clone();
494                    loom::thread::spawn(move || v.insert_all(data))
495                })
496                .collect();
497
498            for handle in handles {
499                handle.join().unwrap();
500            }
501            let res = hll.iter().collect::<Vec<_>>();
502            assert_eq!(res, expected.iter().collect::<Vec<_>>());
503            assert_eq!(
504                hll.zeros.load(Ordering::SeqCst),
505                expected.zeros.load(Ordering::SeqCst)
506            );
507            assert_eq!(
508                hll.sum.load(Ordering::SeqCst),
509                expected.sum.load(Ordering::SeqCst)
510            );
511        });
512    }
513}