use super::DefaultEstimator;
use crate::traits::Word;
use crate::traits::{
EstimationLogic, MergeEstimationLogic, SliceEstimationLogic, assert_backend_len,
};
use num_primitive::{PrimitiveNumber, PrimitiveNumberAs};
use std::hash::*;
use std::num::NonZeroUsize;
use std::{borrow::Borrow, f64::consts::LN_2, fmt};
#[derive(Debug, Clone)]
pub enum HyperLogLogError {
RegisterSizeTooLarge {
register_size: usize,
num_elements: usize,
hash_bits: u32,
},
UnalignedBackend {
est_size_in_bits: usize,
word_bits: usize,
min_alignment: String,
min_log2_num_regs: u32,
},
}
impl fmt::Display for HyperLogLogError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::RegisterSizeTooLarge {
register_size,
num_elements,
hash_bits,
} => write!(
f,
"register size {} (derived from num_elements = {}) exceeds the maximum of 6 \
supported with a {}-bit hash type; reduce num_elements or use a hash with more bits",
register_size, num_elements, hash_bits,
),
Self::UnalignedBackend {
est_size_in_bits,
word_bits,
min_alignment,
min_log2_num_regs,
} => write!(
f,
"estimator size ({} bits) is not divisible by the word size ({} bits), \
so register backends cannot be aligned; use {} or a smaller unsigned integer type \
as the word type, or increase log2_num_regs (possibly by reducing \
the relative standard deviation) to at least {}",
est_size_in_bits, word_bits, min_alignment, min_log2_num_regs,
),
}
}
}
impl std::error::Error for HyperLogLogError {}
type HashResult = u64;
const LOGLOG_BETA: [[f64; 8]; 15] = [
[
-0.582581413904517,
-1.935_300_357_560_05,
11.079323758035073,
-22.131357446444323,
22.505391846630037,
-12.000723834917984,
3.220579408194167,
-0.342225302271235,
],
[
-0.7518999460733967,
-0.959_003_007_774_876,
5.599_737_132_214_161,
-8.209_763_699_976_552,
6.509_125_489_447_204,
-2.683_029_373_432_373,
0.5612891113138221,
-0.0463331622196545,
],
[
29.825_790_096_961_963,
-31.328_708_333_772_592,
-10.594_252_303_658_228,
-11.572_012_568_909_962,
3.818_875_437_390_749,
-2.416_013_032_853_081,
0.4542208940970826,
-0.0575155452020420,
],
[
2.810_292_129_082_006,
-3.9780498518175995,
1.3162680041351582,
-3.925_248_633_580_59,
2.008_083_575_394_647,
-0.7527151937556955,
0.1265569894242751,
-0.0109946438726240,
],
[
1.006_335_448_875_505_2,
-2.005_806_664_051_124,
1.643_697_493_665_141_2,
-2.705_608_099_405_661_7,
1.392_099_802_442_226,
-0.464_703_742_721_831_9,
0.07384282377269775,
-0.00578554885254223,
],
[
-0.09415657458167959,
-0.781_309_759_245_505_3,
1.715_149_467_507_124_6,
-1.737_112_504_065_163_4,
0.864_415_084_890_489_2,
-0.23819027465047218,
0.03343448400269076,
-0.00207858528178157,
],
[
-0.25935400670790054,
-0.525_983_019_998_058_1,
1.489_330_349_258_768_4,
-1.296_427_140_849_935_7,
0.622_847_562_172_216_2,
-0.156_723_267_702_510_4,
0.02054415903878563,
-0.00112488483925502,
],
[
-4.32325553856025e-01,
-1.08450736399632e-01,
6.09156550741120e-01,
-1.65687801845180e-02,
-7.95829341087617e-02,
4.71830602102918e-02,
-7.81372902346934e-03,
5.84268708489995e-04,
],
[
-3.84979202588598e-01,
1.83162233114364e-01,
1.30396688841854e-01,
7.04838927629266e-02,
-8.95893971464453e-03,
1.13010036741605e-02,
-1.94285569591290e-03,
2.25435774024964e-04,
],
[
-0.41655270946462997,
-0.22146677040685156,
0.38862131236999947,
0.453_409_797_460_623_7,
-0.36264738324476375,
0.12304650053558529,
-0.017_015_403_845_555_1,
0.00102750367080838,
],
[
-3.71009760230692e-01,
9.78811941207509e-03,
1.85796293324165e-01,
2.03015527328432e-01,
-1.16710521803686e-01,
4.31106699492820e-02,
-5.99583540511831e-03,
4.49704299509437e-04,
],
[
-0.38215145543875273,
-0.890_694_005_360_908_4,
0.376_023_357_746_788_7,
0.993_359_774_406_823_8,
-0.655_774_416_383_189_6,
0.183_323_421_297_036_1,
-0.02241529633062872,
0.00121399789330194,
],
[
-0.373_318_766_437_530_6,
-1.417_040_774_481_23,
0.40729184796612533,
1.561_520_339_065_841_6,
-0.992_422_335_342_861_3,
0.260_646_813_994_830_9,
-0.03053811369682807,
0.00155770210179105,
],
[
-0.36775502299404605,
0.538_314_223_513_779_7,
0.769_702_892_787_679_2,
0.550_025_835_864_505_6,
-0.745_755_882_611_469_4,
0.257_118_357_858_219_5,
-0.03437902606864149,
0.00185949146371616,
],
[
-0.364_796_233_259_605_4,
0.997_304_123_286_350_3,
1.553_543_862_300_812_2,
1.259_326_771_980_289_2,
-1.533_259_482_091_101_6,
0.478_010_422_000_565_9,
-0.05951025172951174,
0.00291076804642205,
],
];
pub fn beta_horner(z: f64, log2_num_regs: usize) -> f64 {
let beta = LOGLOG_BETA[log2_num_regs - 4];
let zl = (z + 1.0).ln();
let mut res = 0.0;
for i in (1..8).rev() {
res = res * zl + beta[i];
}
res * zl + beta[0] * z
}
#[inline(never)]
pub(crate) fn apply_correction<const BETA: bool>(
harmonic_mean: f64,
zeroes: usize,
num_regs: usize,
log2_num_regs: u32,
alpha_m_m: f64,
) -> f64 {
if BETA && zeroes != 0 && log2_num_regs <= 18 {
let m = num_regs as f64;
let z = zeroes as f64;
let beta = beta_horner(z, log2_num_regs as usize);
alpha_m_m * (m - z) / (m * (harmonic_mean + beta))
} else {
let mut estimate = alpha_m_m / harmonic_mean;
if zeroes != 0 && estimate < 2.5 * num_regs as f64 {
let m = num_regs as f64;
estimate = m * (m / zeroes as f64).ln();
}
estimate
}
}
#[derive(Debug, PartialEq)]
pub struct HyperLogLog<T, H, W = usize, const BETA: bool = true> {
build_hasher: H,
register_size: usize,
num_regs_minus_1: HashResult,
log2_num_regs: u32,
sentinel_mask: HashResult,
num_regs: usize,
pub(super) words_per_estimator: usize,
alpha_m_m: f64,
msb_mask: Box<[W]>,
lsb_mask: Box<[W]>,
_marker: std::marker::PhantomData<T>,
}
impl<T, H: Clone, W: Clone, const BETA: bool> Clone for HyperLogLog<T, H, W, BETA> {
fn clone(&self) -> Self {
Self {
build_hasher: self.build_hasher.clone(),
register_size: self.register_size,
num_regs_minus_1: self.num_regs_minus_1,
log2_num_regs: self.log2_num_regs,
sentinel_mask: self.sentinel_mask,
num_regs: self.num_regs,
words_per_estimator: self.words_per_estimator,
alpha_m_m: self.alpha_m_m,
msb_mask: self.msb_mask.clone(),
lsb_mask: self.lsb_mask.clone(),
_marker: std::marker::PhantomData,
}
}
}
impl<T, H: Clone, W: Word, const BETA: bool> HyperLogLog<T, H, W, BETA> {
pub fn log2_num_regs(&self) -> u32 {
self.log2_num_regs
}
#[inline(always)]
unsafe fn get_register_unchecked(&self, backend: impl AsRef<[W]>, index: usize) -> W {
let backend = backend.as_ref();
let bits = W::BITS as usize;
let bit_width = self.register_size;
let mask = W::MAX >> (bits - bit_width);
let pos = index * bit_width;
let word_index = pos / bits;
let bit_index = pos % bits;
if bit_index + bit_width <= bits {
(unsafe { *backend.get_unchecked(word_index) } >> bit_index) & mask
} else {
((unsafe { *backend.get_unchecked(word_index) } >> bit_index)
| (unsafe { *backend.get_unchecked(word_index + 1) } << (bits - bit_index)))
& mask
}
}
#[inline(always)]
unsafe fn set_register_unchecked(
&self,
mut backend: impl AsMut<[W]>,
index: usize,
new_value: W,
) {
let backend = backend.as_mut();
let bits = W::BITS as usize;
let bit_width = self.register_size;
let mask = W::MAX >> (bits - bit_width);
let pos = index * bit_width;
let word_index = pos / bits;
let bit_index = pos % bits;
if bit_index + bit_width <= bits {
let mut word = unsafe { *backend.get_unchecked_mut(word_index) };
word &= !(mask << bit_index);
word |= new_value << bit_index;
unsafe { *backend.get_unchecked_mut(word_index) = word };
} else {
let mut word = unsafe { *backend.get_unchecked_mut(word_index) };
word &= (W::ONE << bit_index) - W::ONE;
word |= new_value << bit_index;
unsafe { *backend.get_unchecked_mut(word_index) = word };
let mut word = unsafe { *backend.get_unchecked_mut(word_index + 1) };
word &= !(mask >> (bits - bit_index));
word |= new_value >> (bits - bit_index);
unsafe { *backend.get_unchecked_mut(word_index + 1) = word };
}
}
}
impl<T: Hash, H: BuildHasher + Clone, W: Word, const BETA: bool> SliceEstimationLogic<W>
for HyperLogLog<T, H, W, BETA>
where
u32: PrimitiveNumberAs<W>,
{
#[inline(always)]
fn backend_len(&self) -> usize {
self.words_per_estimator
}
}
impl<T: Hash, H: BuildHasher + Clone, W: Word, const BETA: bool> EstimationLogic
for HyperLogLog<T, H, W, BETA>
where
u32: PrimitiveNumberAs<W>,
{
type Item = T;
type Backend = [W];
type Estimator<'a>
= DefaultEstimator<Self, &'a Self, Box<[W]>>
where
T: 'a,
W: 'a,
H: 'a;
fn new_estimator(&self) -> Self::Estimator<'_> {
Self::Estimator::new(
self,
vec![W::ZERO; self.words_per_estimator].into_boxed_slice(),
)
}
fn add(&self, backend: &mut Self::Backend, element: impl Borrow<T>) {
assert_backend_len!(self, backend);
let hash = self.build_hasher.hash_one(element.borrow());
let register = (hash & self.num_regs_minus_1) as usize;
let r = ((hash >> self.log2_num_regs) | self.sentinel_mask)
.trailing_zeros()
.as_to();
debug_assert!(r < (W::ONE << self.register_size) - W::ONE);
debug_assert!(register < self.num_regs);
let current_value = unsafe { self.get_register_unchecked(&*backend, register) };
let candidate_value = r + W::ONE;
let new_value = std::cmp::max(current_value, candidate_value);
if current_value != new_value {
unsafe { self.set_register_unchecked(backend, register, new_value) };
}
}
fn estimate(&self, backend: &[W]) -> f64 {
assert_backend_len!(self, backend);
let mut harmonic_mean = 0.0;
let mut zeroes = 0usize;
for i in 0..self.num_regs {
let value: usize = unsafe { self.get_register_unchecked(backend, i).as_to() };
if value == 0 {
zeroes += 1;
}
debug_assert!(value <= 1023);
harmonic_mean += f64::from_bits((1023 - value as u64) << 52);
}
apply_correction::<BETA>(
harmonic_mean,
zeroes,
self.num_regs,
self.log2_num_regs,
self.alpha_m_m,
)
}
#[inline(always)]
fn clear(&self, backend: &mut [W]) {
backend.fill(W::ZERO);
}
#[inline(always)]
fn set(&self, dst: &mut [W], src: &[W]) {
debug_assert_eq!(dst.len(), src.len());
dst.copy_from_slice(src);
}
}
pub struct HyperLogLogHelper<W> {
acc: Vec<W>,
mask: Vec<W>,
}
impl<T: Hash, H: BuildHasher + Clone, W: Word, const BETA: bool> MergeEstimationLogic
for HyperLogLog<T, H, W, BETA>
where
u32: PrimitiveNumberAs<W>,
{
type Helper = HyperLogLogHelper<W>;
fn new_helper(&self) -> Self::Helper {
HyperLogLogHelper {
acc: vec![W::ZERO; self.words_per_estimator],
mask: vec![W::ZERO; self.words_per_estimator],
}
}
#[inline(always)]
fn merge_with_helper(&self, dst: &mut [W], src: &[W], helper: &mut Self::Helper) {
merge_hyperloglog_bitwise(
dst,
src,
self.msb_mask.as_ref(),
self.lsb_mask.as_ref(),
&mut helper.acc,
&mut helper.mask,
self.register_size,
);
}
}
impl<T, H, W, const BETA: bool> std::fmt::Display for HyperLogLog<T, H, W, BETA> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"HyperLogLog with relative standard deviation: {}% ({} registers/estimator, {} bits/register, {} bytes/estimator)",
100.0 * HyperLogLog::rel_std(self.log2_num_regs),
self.num_regs,
self.register_size,
(self.num_regs * self.register_size) / 8
)
}
}
#[derive(Debug, Clone)]
pub struct HyperLogLogBuilder<H, W = usize, const BETA: bool = true> {
build_hasher: H,
log2_num_regs: u32,
num_elements: usize,
_marker: std::marker::PhantomData<W>,
}
impl HyperLogLogBuilder<BuildHasherDefault<DefaultHasher>> {
pub const fn new(num_elements: usize) -> Self {
assert!(
num_elements > 0,
"the upper bound on the number of distinct elements must be positive"
);
Self {
build_hasher: BuildHasherDefault::new(),
log2_num_regs: 4,
num_elements,
_marker: std::marker::PhantomData,
}
}
}
fn min_alignment(bits: usize) -> String {
if bits % 128 == 0 {
"u128"
} else if bits % 64 == 0 {
"u64"
} else if bits % 32 == 0 {
"u32"
} else if bits % 16 == 0 {
"u16"
} else {
"u8"
}
.to_string()
}
impl HyperLogLog<(), (), (), true> {
pub fn log2_num_of_registers(rsd: f64) -> u32 {
(((1.106 / rsd).powi(2)).log2().ceil() as u32).max(4)
}
pub fn rel_std(log2_num_regs: u32) -> f64 {
let tmp = match log2_num_regs {
4 => 1.106,
5 => 1.070,
6 => 1.054,
7 => 1.046,
_ => 1.04,
};
tmp / ((1 << log2_num_regs) as f64).sqrt()
}
pub fn register_size(num_elements: usize) -> usize {
std::cmp::max(
5,
(((num_elements as f64).ln() / LN_2) / LN_2).ln().ceil() as usize,
)
}
}
impl<H, W: Word, const BETA: bool> HyperLogLogBuilder<H, W, BETA> {
pub fn rsd(self, rsd: f64) -> Self {
self.log2_num_regs(HyperLogLog::log2_num_of_registers(rsd))
}
pub const fn log2_num_regs(mut self, log2_num_regs: u32) -> Self {
assert!(
log2_num_regs >= 4,
"the logarithm of the number of registers per estimator should be at least 4"
);
self.log2_num_regs = log2_num_regs;
self
}
pub fn min_log2_num_regs(&self) -> u32 {
let register_size = HyperLogLog::register_size(self.num_elements);
let register_size = NonZeroUsize::try_from(register_size).expect("register_size is zero");
let min_num_regs = W::BITS / highest_power_of_2_dividing(register_size);
assert_eq!(min_num_regs, min_num_regs.next_power_of_two());
min_num_regs.trailing_zeros() }
pub fn word_type<W2>(self) -> HyperLogLogBuilder<H, W2, BETA> {
HyperLogLogBuilder {
num_elements: self.num_elements,
build_hasher: self.build_hasher,
log2_num_regs: self.log2_num_regs,
_marker: std::marker::PhantomData,
}
}
pub fn beta<const BETA2: bool>(self) -> HyperLogLogBuilder<H, W, BETA2> {
HyperLogLogBuilder {
num_elements: self.num_elements,
build_hasher: self.build_hasher,
log2_num_regs: self.log2_num_regs,
_marker: std::marker::PhantomData,
}
}
pub const fn num_elements(mut self, num_elements: usize) -> Self {
assert!(
num_elements > 0,
"the upper bound on the number of distinct elements must be positive"
);
self.num_elements = num_elements;
self
}
pub fn build_hasher<H2>(self, build_hasher: H2) -> HyperLogLogBuilder<H2, W, BETA> {
HyperLogLogBuilder {
num_elements: self.num_elements,
log2_num_regs: self.log2_num_regs,
build_hasher,
_marker: std::marker::PhantomData,
}
}
pub fn build<T>(self) -> Result<HyperLogLog<T, H, W, BETA>, HyperLogLogError> {
let bits = W::BITS as usize;
let log2_num_regs = self.log2_num_regs;
let num_elements = self.num_elements;
let number_of_registers = 1 << log2_num_regs;
let register_size = HyperLogLog::register_size(num_elements);
if register_size > 6 {
return Err(HyperLogLogError::RegisterSizeTooLarge {
register_size,
num_elements,
hash_bits: HashResult::BITS,
});
}
let sentinel_mask = 1 << ((1 << register_size) - 2);
let alpha = match log2_num_regs {
4 => 0.673,
5 => 0.697,
6 => 0.709,
_ => 0.7213 / (1.0 + 1.079 / number_of_registers as f64),
};
let num_regs_minus_1 = (number_of_registers - 1) as HashResult;
let est_size_in_bits = number_of_registers * register_size;
if est_size_in_bits % bits != 0 {
return Err(HyperLogLogError::UnalignedBackend {
est_size_in_bits,
word_bits: bits,
min_alignment: min_alignment(est_size_in_bits),
min_log2_num_regs: self.min_log2_num_regs(),
});
}
let est_size_in_words = est_size_in_bits / bits;
let msb_mask = build_register_mask(
est_size_in_words,
register_size,
W::ONE << (register_size - 1),
);
let lsb_mask = build_register_mask(est_size_in_words, register_size, W::ONE);
Ok(HyperLogLog {
num_regs: number_of_registers,
num_regs_minus_1,
log2_num_regs,
register_size,
alpha_m_m: alpha * (number_of_registers as f64).powi(2),
sentinel_mask,
build_hasher: self.build_hasher,
msb_mask,
lsb_mask,
words_per_estimator: est_size_in_words,
_marker: std::marker::PhantomData,
})
}
}
fn build_register_mask<W: Word>(num_words: usize, register_size: usize, pattern: W) -> Box<[W]> {
let bits = W::BITS as usize;
let total_bits = num_words * bits;
let mut result = vec![W::ZERO; num_words];
let mut bit_pos = 0;
while bit_pos < total_bits {
let word_index = bit_pos / bits;
let bit_index = bit_pos % bits;
result[word_index] |= pattern << bit_index;
if bit_index + register_size > bits && word_index + 1 < num_words {
result[word_index + 1] |= pattern >> (bits - bit_index);
}
bit_pos += register_size;
}
result.into_boxed_slice()
}
#[inline(always)]
pub(super) fn subtract<W: Word>(x: &mut [W], y: &[W]) {
debug_assert_eq!(x.len(), y.len());
let mut borrow = false;
for (x_word, &y) in x.iter_mut().zip(y.iter()) {
let mut x = *x_word;
if !borrow {
borrow = x < y;
} else if x != W::ZERO {
x = x.wrapping_sub(W::ONE);
borrow = x < y;
} else {
x = x.wrapping_sub(W::ONE);
}
*x_word = x.wrapping_sub(y);
}
}
fn merge_hyperloglog_bitwise<W: Word>(
mut x: impl AsMut<[W]>,
y: impl AsRef<[W]>,
msb_mask: impl AsRef<[W]>,
lsb_mask: impl AsRef<[W]>,
acc: &mut Vec<W>,
mask: &mut Vec<W>,
register_size: usize,
) {
let x = x.as_mut();
let y = y.as_ref();
let msb_mask = msb_mask.as_ref();
let lsb_mask = lsb_mask.as_ref();
debug_assert_eq!(x.len(), y.len());
debug_assert_eq!(x.len(), msb_mask.len());
debug_assert_eq!(x.len(), lsb_mask.len());
let register_size_minus_1 = register_size - 1;
let num_words_minus_1 = x.len() - 1;
let shift_register_size_minus_1 = W::BITS as usize - register_size_minus_1;
acc.clear();
mask.clear();
acc.extend(
y.iter()
.zip(msb_mask)
.map(|(&y_word, &msb_word)| y_word | msb_word),
);
mask.extend(
x.iter()
.zip(msb_mask)
.map(|(&x_word, &msb_word)| x_word & !msb_word),
);
subtract(acc, mask);
acc.iter_mut()
.zip(x.iter())
.zip(y.iter())
.zip(msb_mask.iter())
.for_each(|(((acc_word, &x_word), &y_word), &msb_word)| {
*acc_word = ((*acc_word | (y_word ^ x_word)) ^ (y_word | !x_word)) & msb_word
});
{
let (mask_last, mask_slice) = mask.split_last_mut().unwrap();
let (&msb_last, msb_slice) = msb_mask.split_last().unwrap();
mask_slice
.iter_mut()
.zip(acc[0..num_words_minus_1].iter())
.zip(acc[1..].iter())
.zip(msb_slice.iter())
.rev()
.for_each(|(((mask_word, &acc_word), &next_acc_word), &msb_word)| {
*mask_word = (acc_word >> register_size_minus_1)
| (next_acc_word << shift_register_size_minus_1)
| msb_word
});
*mask_last = (acc[num_words_minus_1] >> register_size_minus_1) | msb_last;
}
subtract(mask, lsb_mask);
mask.iter_mut()
.zip(msb_mask.iter())
.zip(acc.iter())
.for_each(|((mask_word, &msb_word), &acc_word)| {
*mask_word = (*mask_word | msb_word) ^ acc_word
});
x.iter_mut()
.zip(y.iter())
.zip(mask.iter())
.for_each(|((x_word, &y_word), &mask_word)| {
*x_word = *x_word ^ ((*x_word ^ y_word) & mask_word);
});
}
fn highest_power_of_2_dividing(n: NonZeroUsize) -> u32 {
1 << n.trailing_zeros()
}
#[test]
fn test_highest_power_of_2_dividing() {
let powers_of_2: Vec<_> = (1..=20)
.map(|n| highest_power_of_2_dividing(n.try_into().unwrap()))
.collect();
assert_eq!(
powers_of_2,
vec![1, 2, 1, 4, 1, 2, 1, 8, 1, 2, 1, 4, 1, 2, 1, 16, 1, 2, 1, 4]
);
}