use serde::{Deserialize, Serialize};
use std::{
cmp::{self, Ordering}, convert::{identity, TryFrom}, fmt, hash::{Hash, Hasher}, marker::PhantomData, ops::{self, Range}
};
use xxhash_rust::xxh3::Xxh3;
use super::{f64_to_u8, u64_to_f64, usize_to_f64};
use crate::traits::{Intersect, IntersectPlusUnionIsPlus, New, UnionAssign};
mod consts;
use self::consts::{BIAS_DATA, RAW_ESTIMATE_DATA, TRESHOLD_DATA};
#[derive(Serialize, Deserialize)]
#[serde(bound = "")]
pub struct HyperLogLogMagnitude<V>(HyperLogLog<V>);
impl<V: Hash> Ord for HyperLogLogMagnitude<V> {
#[inline(always)]
fn cmp(&self, other: &Self) -> Ordering {
self.0.len().partial_cmp(&other.0.len()).unwrap()
}
}
impl<V: Hash> PartialOrd for HyperLogLogMagnitude<V> {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.0.len().partial_cmp(&other.0.len())
}
}
impl<V: Hash> PartialEq for HyperLogLogMagnitude<V> {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.0.len().eq(&other.0.len())
}
}
impl<V: Hash> Eq for HyperLogLogMagnitude<V> {}
impl<V: Hash> Clone for HyperLogLogMagnitude<V> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<V: Hash> New for HyperLogLogMagnitude<V> {
type Config = f64;
fn new(config: &Self::Config) -> Self {
Self(New::new(config))
}
}
impl<V: Hash> Intersect for HyperLogLogMagnitude<V> {
fn intersect<'a>(iter: impl Iterator<Item = &'a Self>) -> Option<Self>
where
Self: Sized + 'a,
{
Intersect::intersect(iter.map(|x| &x.0)).map(Self)
}
}
impl<'a, V: Hash> UnionAssign<&'a HyperLogLogMagnitude<V>> for HyperLogLogMagnitude<V> {
fn union_assign(&mut self, rhs: &'a Self) {
self.0.union_assign(&rhs.0)
}
}
impl<'a, V: Hash> ops::AddAssign<&'a V> for HyperLogLogMagnitude<V> {
fn add_assign(&mut self, rhs: &'a V) {
self.0.add_assign(rhs)
}
}
impl<'a, V: Hash> ops::AddAssign<&'a Self> for HyperLogLogMagnitude<V> {
fn add_assign(&mut self, rhs: &'a Self) {
self.0.add_assign(&rhs.0)
}
}
impl<V: Hash> fmt::Debug for HyperLogLogMagnitude<V> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}
impl<V> IntersectPlusUnionIsPlus for HyperLogLogMagnitude<V> {
const VAL: bool = <HyperLogLog<V> as IntersectPlusUnionIsPlus>::VAL;
}
#[derive(Serialize, Deserialize)]
#[serde(bound = "")]
pub struct HyperLogLog<V: ?Sized> {
alpha: f64,
zero: usize,
sum: f64,
p: u8,
m: Box<[u8]>,
marker: PhantomData<fn(V)>,
}
impl<V: ?Sized> HyperLogLog<V>
where
V: Hash,
{
pub fn new(error_rate: f64) -> Self {
assert!(0.0 < error_rate && error_rate < 1.0);
let p = f64_to_u8((f64::log2(1.04 / error_rate) * 2.0).ceil());
assert!(
(4..=16).contains(&p),
"computed p={} from error_rate is outside supported range [4,16]",
p
);
Self::with_p(p)
}
pub fn with_p(p: u8) -> Self {
assert!(
(4..=16).contains(&p),
"p out of supported range for bias/threshold tables (have 4..=16)"
);
let m = 1usize << p;
let alpha = Self::get_alpha(p);
Self {
alpha,
zero: m,
sum: m as f64,
p,
m: vec![0u8; m].into_boxed_slice(),
marker: PhantomData,
}
}
pub fn new_from(hll: &Self) -> Self {
Self {
alpha: hll.alpha,
zero: hll.m.len(),
sum: usize_to_f64(hll.m.len()),
p: hll.p,
m: vec![0; hll.m.len()].into_boxed_slice(),
marker: PhantomData,
}
}
#[inline]
pub fn push(&mut self, value: &V) {
let mut hasher = Xxh3::default();
value.hash(&mut hasher);
let x = hasher.finish();
let j = x & (self.m.len() as u64 - 1);
let w = x >> self.p;
let rho = Self::get_rho(w, 64 - self.p);
let mjr = &mut self.m[usize::try_from(j).unwrap()];
let old = *mjr;
let new = cmp::max(old, rho);
self.zero -= if old == 0 { 1 } else { 0 };
self.sum -= f64::from_bits(u64::MAX.wrapping_sub(u64::from(old)) << 54 >> 2)
- f64::from_bits(u64::MAX.wrapping_sub(u64::from(new)) << 54 >> 2);
*mjr = new;
}
pub fn push_hash64(&mut self, x: u64) {
let j = (x & ((self.m.len() as u64) - 1)) as usize;
let w = x >> self.p;
let rho = Self::get_rho(w, 64 - self.p);
let mjr = &mut self.m[j];
let old = *mjr;
let new = old.max(rho);
if old == 0 {
self.zero -= 1;
}
self.sum -= f64::from_bits(u64::MAX.wrapping_sub(old as u64) << 54 >> 2)
- f64::from_bits(u64::MAX.wrapping_sub(new as u64) << 54 >> 2);
*mjr = new;
}
pub fn len(&self) -> f64 {
let v = self.zero;
if v > 0 {
let h =
usize_to_f64(self.m.len()) * (usize_to_f64(self.m.len()) / usize_to_f64(v)).ln();
if h <= Self::get_threshold(self.p - 4) {
return h;
}
}
self.ep()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.zero == self.m.len()
}
pub fn union(&mut self, src: &Self) {
assert_eq!(src.alpha, self.alpha);
assert_eq!(src.p, self.p);
assert_eq!(src.m.len(), self.m.len());
#[cfg(feature = "stdsimd")]
{
let (zero, sum) = simd_union_assign::<64>(&mut self.m, &src.m);
self.zero = zero;
self.sum = sum;
return;
}
#[cfg(not(feature = "stdsimd"))]
{
let mut zero = 0usize;
let mut sum = 0.0f64;
for (to, from) in self.m.iter_mut().zip(src.m.iter()) {
*to = (*to).max(*from);
zero += if *to == 0 { 1 } else { 0 };
sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(*to)) << 54 >> 2);
}
self.zero = zero;
self.sum = sum;
}
}
pub fn intersect(&mut self, src: &Self) {
assert_eq!(src.alpha, self.alpha);
assert_eq!(src.p, self.p);
assert_eq!(src.m.len(), self.m.len());
#[cfg(feature = "stdsimd")]
{
let (zero, sum) = simd_intersect_assign::<64>(&mut self.m, &src.m);
self.zero = zero;
self.sum = sum;
return;
}
#[cfg(not(feature = "stdsimd"))]
{
let mut zero = 0usize;
let mut sum = 0.0f64;
for (to, from) in self.m.iter_mut().zip(src.m.iter()) {
*to = (*to).min(*from);
zero += if *to == 0 { 1 } else { 0 };
sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(*to)) << 54 >> 2);
}
self.zero = zero;
self.sum = sum;
}
}
pub fn clear(&mut self) {
self.zero = self.m.len();
self.sum = usize_to_f64(self.m.len());
self.m.iter_mut().for_each(|x| *x = 0);
}
#[inline]
fn get_threshold(p: u8) -> f64 {
TRESHOLD_DATA[p as usize]
}
fn get_alpha(p: u8) -> f64 {
assert!(4 <= p && p <= 16);
match p {
4 => 0.673,
5 => 0.697,
6 => 0.709,
_ => 0.7213 / (1.0 + 1.079 / u64_to_f64(1_u64 << p)),
}
}
#[inline]
fn get_rho(w: u64, max_width: u8) -> u8 {
let rho = max_width - (64 - u8::try_from(w.leading_zeros()).unwrap()) + 1;
assert!(0 < rho && rho < 65);
rho
}
fn estimate_bias(e: f64, p: u8) -> f64 {
let bias_vector = BIAS_DATA[(p - 4) as usize];
let neighbors = Self::get_nearest_neighbors(e, RAW_ESTIMATE_DATA[(p - 4) as usize]);
assert_eq!(neighbors.len(), 6);
bias_vector[neighbors].iter().sum::<f64>() / 6.0_f64
}
#[cfg(feature = "serde")]
pub fn save<W: std::io::Write>(&self, mut writer: W) -> std::io::Result<()> {
bincode::serialize_into(&mut writer, &self)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
#[cfg(feature = "serde")]
pub fn load<R: std::io::Read>(mut reader: R) -> std::io::Result<Self> {
use std::io;
use bincode::ErrorKind as BinErr;
let mut sketch: Self = match bincode::deserialize_from::<_, Self>(&mut reader) {
Ok(s) => s,
Err(e) => {
match *e {
BinErr::Io(ref ioe) => {
return Err(io::Error::new(ioe.kind(), ioe.to_string()));
}
_ => {
return Err(io::Error::new(io::ErrorKind::Other, e));
}
}
}
};
if !(4..=16).contains(&sketch.p) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid p={} (expected 4..=16)", sketch.p),
));
}
let expected_m = 1usize << sketch.p;
if sketch.m.len() != expected_m {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("state length {} does not match 2^p ({})", sketch.m.len(), expected_m),
));
}
sketch.alpha = Self::get_alpha(sketch.p);
sketch.zero = sketch.m.iter().filter(|&&r| r == 0).count();
sketch.sum = sketch
.m
.iter()
.map(|&r| f64::from_bits(u64::MAX.wrapping_sub(r as u64) << 54 >> 2))
.sum::<f64>();
Ok(sketch)
}
fn get_nearest_neighbors(e: f64, estimate_vector: &[f64]) -> Range<usize> {
let index = estimate_vector
.binary_search_by(|a| a.partial_cmp(&e).unwrap_or(Ordering::Equal))
.unwrap_or_else(identity);
let mut min = if index > 6 { index - 6 } else { 0 };
let mut max = cmp::min(index + 6, estimate_vector.len());
while max - min != 6 {
let (min_val, max_val) = unsafe {
(
*estimate_vector.get_unchecked(min),
*estimate_vector.get_unchecked(max - 1),
)
};
if 2.0 * e - min_val > max_val {
min += 1;
} else {
max -= 1;
}
}
min..max
}
#[inline]
fn ep(&self) -> f64 {
let e = self.alpha * usize_to_f64(self.m.len() * self.m.len()) / self.sum;
if e <= usize_to_f64(5 * self.m.len()) {
e - Self::estimate_bias(e, self.p)
} else {
e
}
}
}
impl<V: ?Sized> Clone for HyperLogLog<V> {
fn clone(&self) -> Self {
Self {
alpha: self.alpha,
zero: self.zero,
sum: self.sum,
p: self.p,
m: self.m.clone(),
marker: PhantomData,
}
}
}
impl<V: ?Sized> fmt::Debug for HyperLogLog<V>
where
V: Hash,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("HyperLogLog")
.field("len", &self.len())
.finish()
}
}
impl<V: ?Sized> New for HyperLogLog<V>
where
V: Hash,
{
type Config = f64;
fn new(config: &Self::Config) -> Self {
Self::new(*config)
}
}
impl<V: ?Sized> Intersect for HyperLogLog<V>
where
V: Hash,
{
fn intersect<'a>(mut iter: impl Iterator<Item = &'a Self>) -> Option<Self>
where
Self: Sized + 'a,
{
let mut ret = iter.next()?.clone();
iter.for_each(|x| ret.intersect(x));
Some(ret)
}
}
impl<'a, V: ?Sized> UnionAssign<&'a HyperLogLog<V>> for HyperLogLog<V>
where
V: Hash,
{
fn union_assign(&mut self, rhs: &'a Self) {
self.union(rhs)
}
}
impl<'a, V: ?Sized> ops::AddAssign<&'a V> for HyperLogLog<V>
where
V: Hash,
{
fn add_assign(&mut self, rhs: &'a V) {
self.push(rhs)
}
}
impl<'a, V: ?Sized> ops::AddAssign<&'a Self> for HyperLogLog<V>
where
V: Hash,
{
fn add_assign(&mut self, rhs: &'a Self) {
self.union(rhs)
}
}
impl<V: ?Sized> IntersectPlusUnionIsPlus for HyperLogLog<V> {
const VAL: bool = true;
}
#[cfg(feature = "stdsimd")]
mod simd_portable {
use std::simd::{
num::SimdUint, prelude::{SimdFloat, SimdOrd, SimdPartialEq}, LaneCount, Simd, SupportedLaneCount
};
#[inline]
fn powbits_sum_f32<const N: usize>(v: Simd<u8, N>) -> f64
where
LaneCount<N>: SupportedLaneCount,
{
let x_u32: Simd<u32, N> = v.cast();
let t = ((Simd::splat(u32::MAX) - x_u32) << Simd::splat(25)) >> Simd::splat(2);
let f: Simd<f32, N> = Simd::from_bits(t);
f.reduce_sum() as f64
}
#[inline]
fn zero_lanes<const N: usize>(v: Simd<u8, N>) -> usize
where
LaneCount<N>: SupportedLaneCount,
{
let m = v.simd_eq(Simd::splat(0));
m.to_bitmask().count_ones() as usize
}
pub fn simd_union_assign<const N: usize>(a: &mut [u8], b: &[u8]) -> (usize, f64)
where
LaneCount<N>: SupportedLaneCount,
{
let (a_chunks, a_rem) = a.as_chunks_mut::<N>();
let (b_chunks, b_rem) = b.as_chunks::<N>();
let mut zero = 0usize;
let mut sum = 0.0f64;
for (aa, bb) in a_chunks.iter_mut().zip(b_chunks.iter()) {
let av = Simd::<u8, N>::from_array(*aa);
let bv = Simd::<u8, N>::from_array(*bb);
let rv = av.simd_max(bv);
zero += zero_lanes(rv);
sum += powbits_sum_f32(rv);
*aa = rv.to_array();
}
for (to, from) in a_rem.iter_mut().zip(b_rem.iter()) {
let v = (*to).max(*from);
*to = v;
if v == 0 {
zero += 1;
}
sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(v)) << 54 >> 2);
}
(zero, sum)
}
pub fn simd_intersect_assign<const N: usize>(a: &mut [u8], b: &[u8]) -> (usize, f64)
where
LaneCount<N>: SupportedLaneCount,
{
let (a_chunks, a_rem) = a.as_chunks_mut::<N>();
let (b_chunks, b_rem) = b.as_chunks::<N>();
let mut zero = 0usize;
let mut sum = 0.0f64;
for (aa, bb) in a_chunks.iter_mut().zip(b_chunks.iter()) {
let av = Simd::<u8, N>::from_array(*aa);
let bv = Simd::<u8, N>::from_array(*bb);
let rv = av.simd_min(bv);
zero += zero_lanes(rv);
sum += powbits_sum_f32(rv);
*aa = rv.to_array();
}
for (to, from) in a_rem.iter_mut().zip(b_rem.iter()) {
let v = (*to).min(*from);
*to = v;
if v == 0 {
zero += 1;
}
sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(v)) << 54 >> 2);
}
(zero, sum)
}
}
#[cfg(feature = "stdsimd")]
use simd_portable::{simd_intersect_assign, simd_union_assign};
#[cfg(test)]
mod test {
use super::{super::f64_to_usize, HyperLogLog};
use std::f64;
use std::io::Write;
#[test]
fn pow_bithack() {
for x in 0_u8..65 {
let a = 2.0_f64.powi(-(i32::from(x)));
let b = f64::from_bits(u64::MAX.wrapping_sub(u64::from(x)) << 54 >> 2);
let c = f32::from_bits(u32::MAX.wrapping_sub(u32::from(x)) << 25 >> 2);
assert_eq!(a, b);
assert_eq!(a, f64::from(c));
}
}
#[test]
fn hyperloglog_test_simple() {
let mut hll = HyperLogLog::new(0.00408);
let keys = ["test1", "test2", "test3", "test2", "test2", "test2"];
for k in &keys {
hll.push(k);
}
assert!((hll.len().round() - 3.0).abs() < f64::EPSILON);
assert!(!hll.is_empty());
hll.clear();
assert!(hll.is_empty());
assert!(hll.len() == 0.0);
}
#[test]
fn hyperloglog_test_merge() {
let mut hll = HyperLogLog::new(0.00408);
let keys = ["test1", "test2", "test3", "test2", "test2", "test2"];
for k in &keys {
hll.push(k);
}
assert!((hll.len().round() - 3.0).abs() < f64::EPSILON);
let mut hll2 = HyperLogLog::new_from(&hll);
let keys2 = ["test3", "test4", "test4", "test4", "test4", "test1"];
for k in &keys2 {
hll2.push(k);
}
assert!((hll2.len().round() - 3.0).abs() < f64::EPSILON);
hll.union(&hll2);
assert!((hll.len().round() - 4.0).abs() < f64::EPSILON);
}
#[test]
fn push() {
let actual = 100_000.0;
let p = 0.05;
let mut hll = HyperLogLog::new(p);
for i in 0..f64_to_usize(actual) {
hll.push(&i);
}
assert!(hll.len() > (actual - (actual * p * 3.0)));
assert!(hll.len() < (actual + (actual * p * 3.0)));
}
#[test]
fn push_hash64() {
use std::hash::Hash;
use xxhash_rust::xxh3::Xxh3;
use std::hash::Hasher;
let mut h_from_val = HyperLogLog::new(0.01);
let mut h_from_hash = HyperLogLog::new_from(&h_from_val);
for i in 0u64..10_000 {
let mut hasher = Xxh3::default();
i.hash(&mut hasher);
let x = hasher.finish();
h_from_val.push(&i);
h_from_hash.push_hash64(x);
}
let a = h_from_val.len();
let b = h_from_hash.len();
assert!((a - b).abs() < f64::EPSILON, "len mismatch: a={a}, b={b}");
assert!(a > 9000.0 && a < 11000.0, "unexpected estimate: {a}");
}
#[cfg(feature = "serde")]
#[test]
fn hll_save_and_load_roundtrip() {
use std::fs::{remove_file, File};
use std::io::{BufReader, BufWriter};
let mut hll = HyperLogLog::with_p(12); for i in 0u64..10_000 {
hll.push(&i);
}
let est_before = hll.len();
let path = "test_hll.bin";
{
let f = File::create(path).expect("create file");
let w = BufWriter::new(f);
hll.save(w).expect("save hll");
}
let loaded = {
let f = File::open(path).expect("open file");
let r = BufReader::new(f);
HyperLogLog::<u8>::load(r).expect("load hll") };
assert_eq!(loaded.p, hll.p);
assert_eq!(loaded.m.len(), hll.m.len());
assert_eq!(&*loaded.m, &*hll.m, "register arrays differ after load");
let est_after = loaded.len();
let diff = (est_after - est_before).abs();
assert!(
diff <= 1e-12,
"estimate drift too large: before={est_before}, after={est_after}, diff={diff}"
);
remove_file(path).ok();
}
#[cfg(feature = "serde")]
#[test]
fn hll_save_two_then_load_back_streaming_cardinality() -> std::io::Result<()> {
use super::HyperLogLog;
use std::fs::{remove_file, File};
use std::io::{BufReader, BufWriter};
let mut s1 = HyperLogLog::<u64>::with_p(12);
let mut s2 = HyperLogLog::<u64>::with_p(12);
for i in 0u64..10_000 {
s1.push(&i);
}
for i in 5_000u64..15_000 {
s2.push(&i);
}
let t1 = 10_000.0;
let t2 = 10_000.0;
let tu = 15_000.0;
let rel_tol = 0.05;
let e1 = s1.len();
let e2 = s2.len();
let mut u = s1.clone();
u.union(&s2);
let eu = u.len();
assert!((e1 - t1).abs() <= t1 * rel_tol, "s1 est={} truth={}", e1, t1);
assert!((e2 - t2).abs() <= t2 * rel_tol, "s2 est={} truth={}", e2, t2);
assert!((eu - tu).abs() <= tu * rel_tol, "union est={} truth={}", eu, tu);
let path = "test_hll_stream_two_card.bin";
{
let f = File::create(path)?;
let mut w = BufWriter::new(f);
s1.save(&mut w)?;
s2.save(&mut w)?;
w.flush()?;
}
let (s1b, s2b) = {
let f = File::open(path)?;
let mut r = BufReader::new(f);
let a = HyperLogLog::<u64>::load(&mut r)?;
let b = HyperLogLog::<u64>::load(&mut r)?;
(a, b)
};
let e1b = s1b.len();
let e2b = s2b.len();
let mut ub = s1b.clone();
ub.union(&s2b);
let eub = ub.len();
assert!((e1b - t1).abs() <= t1 * rel_tol, "s1(load) est={} truth={}", e1b, t1);
assert!((e2b - t2).abs() <= t2 * rel_tol, "s2(load) est={} truth={}", e2b, t2);
assert!((eub - tu).abs() <= tu * rel_tol, "union(load) est={} truth={}", eub, tu);
assert!((e1 - e1b).abs() < 1e-9, "s1 est changed by save/load");
assert!((e2 - e2b).abs() < 1e-9, "s2 est changed by save/load");
assert!((eu - eub).abs() < 1e-9, "union est changed by save/load");
remove_file(path).ok();
Ok(())
}
}