use serde::{Deserialize, Serialize};
use std::{
cmp::{self, Ordering}, convert::{identity, TryFrom}, fmt, hash::{Hash, Hasher}, marker::PhantomData, ops::{self, Range}
};
use twox_hash::XxHash;
use super::{f64_to_u8, u64_to_f64, usize_to_f64};
use crate::traits::{Intersect, IntersectPlusUnionIsPlus, New, UnionAssign};
mod consts;
use self::consts::{BIAS_DATA, RAW_ESTIMATE_DATA, TRESHOLD_DATA};
#[derive(Serialize, Deserialize)]
#[serde(bound = "")]
pub struct HyperLogLogMagnitude<V>(HyperLogLog<V>);
impl<V: Hash> Ord for HyperLogLogMagnitude<V> {
#[inline(always)]
fn cmp(&self, other: &Self) -> Ordering {
self.0.len().partial_cmp(&other.0.len()).unwrap()
}
}
impl<V: Hash> PartialOrd for HyperLogLogMagnitude<V> {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.0.len().partial_cmp(&other.0.len())
}
}
impl<V: Hash> PartialEq for HyperLogLogMagnitude<V> {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.0.len().eq(&other.0.len())
}
}
impl<V: Hash> Eq for HyperLogLogMagnitude<V> {}
impl<V: Hash> Clone for HyperLogLogMagnitude<V> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<V: Hash> New for HyperLogLogMagnitude<V> {
type Config = f64;
fn new(config: &Self::Config) -> Self {
Self(New::new(config))
}
}
impl<V: Hash> Intersect for HyperLogLogMagnitude<V> {
fn intersect<'a>(iter: impl Iterator<Item = &'a Self>) -> Option<Self>
where
Self: Sized + 'a,
{
Intersect::intersect(iter.map(|x| &x.0)).map(Self)
}
}
impl<'a, V: Hash> UnionAssign<&'a HyperLogLogMagnitude<V>> for HyperLogLogMagnitude<V> {
fn union_assign(&mut self, rhs: &'a Self) {
self.0.union_assign(&rhs.0)
}
}
impl<'a, V: Hash> ops::AddAssign<&'a V> for HyperLogLogMagnitude<V> {
fn add_assign(&mut self, rhs: &'a V) {
self.0.add_assign(rhs)
}
}
impl<'a, V: Hash> ops::AddAssign<&'a Self> for HyperLogLogMagnitude<V> {
fn add_assign(&mut self, rhs: &'a Self) {
self.0.add_assign(&rhs.0)
}
}
impl<V: Hash> fmt::Debug for HyperLogLogMagnitude<V> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}
impl<V> IntersectPlusUnionIsPlus for HyperLogLogMagnitude<V> {
const VAL: bool = <HyperLogLog<V> as IntersectPlusUnionIsPlus>::VAL;
}
#[derive(Serialize, Deserialize)]
#[serde(bound = "")]
pub struct HyperLogLog<V: ?Sized> {
alpha: f64,
zero: usize,
sum: f64,
p: u8,
m: Box<[u8]>,
marker: PhantomData<fn(V)>,
}
impl<V: ?Sized> HyperLogLog<V>
where
V: Hash,
{
pub fn new(error_rate: f64) -> Self {
assert!(0.0 < error_rate && error_rate < 1.0);
let p = f64_to_u8((f64::log2(1.04 / error_rate) * 2.0).ceil());
assert!(0 < p && p < 64);
let alpha = Self::get_alpha(p);
Self {
alpha,
zero: 1 << p,
sum: f64::from(1 << p),
p,
m: vec![0; 1 << p].into_boxed_slice(),
marker: PhantomData,
}
}
pub fn new_from(hll: &Self) -> Self {
Self {
alpha: hll.alpha,
zero: hll.m.len(),
sum: usize_to_f64(hll.m.len()),
p: hll.p,
m: vec![0; hll.m.len()].into_boxed_slice(),
marker: PhantomData,
}
}
#[inline]
pub fn push(&mut self, value: &V) {
let mut hasher = XxHash::default();
value.hash(&mut hasher);
let x = hasher.finish();
let j = x & (self.m.len() as u64 - 1);
let w = x >> self.p;
let rho = Self::get_rho(w, 64 - self.p);
let mjr = &mut self.m[usize::try_from(j).unwrap()];
let old = *mjr;
let new = cmp::max(old, rho);
self.zero -= if old == 0 { 1 } else { 0 };
self.sum -= f64::from_bits(u64::max_value().wrapping_sub(u64::from(old)) << 54 >> 2)
- f64::from_bits(u64::max_value().wrapping_sub(u64::from(new)) << 54 >> 2);
*mjr = new;
}
pub fn len(&self) -> f64 {
let v = self.zero;
if v > 0 {
let h =
usize_to_f64(self.m.len()) * (usize_to_f64(self.m.len()) / usize_to_f64(v)).ln();
if h <= Self::get_threshold(self.p - 4) {
return h;
}
}
self.ep()
}
pub fn is_empty(&self) -> bool {
self.zero == self.m.len()
}
pub fn union(&mut self, src: &Self) {
assert_eq!(src.alpha, self.alpha);
assert_eq!(src.p, self.p);
assert_eq!(src.m.len(), self.m.len());
#[cfg(all(
feature = "packed_simd",
any(target_arch = "x86", target_arch = "x86_64")
))]
{
assert_eq!(self.m.len() % u8s::lanes(), 0);
assert_eq!(u8s::lanes(), f32s::lanes() * 4);
assert_eq!(f32s::lanes(), u32s::lanes());
assert_eq!(u8sq::lanes(), u32s::lanes());
let mut zero = u8s_sad_out::splat(0);
let mut sum = f32s::splat(0.0);
for i in (0..self.m.len()).step_by(u8s::lanes()) {
unsafe {
let self_m = u8s::from_slice_unaligned_unchecked(self.m.get_unchecked(i..));
let src_m = u8s::from_slice_unaligned_unchecked(src.m.get_unchecked(i..));
let res = self_m.max(src_m);
res.write_to_slice_unaligned_unchecked(self.m.get_unchecked_mut(i..));
let count: u8s = u8s::splat(0) - u8s::from_bits(res.eq(u8s::splat(0)));
let count2 = Sad::<u8s>::sad(count, u8s::splat(0));
zero += count2;
for j in 0..4 {
let x = u8sq::from_slice_unaligned_unchecked(
self.m.get_unchecked(i + j * u8sq::lanes()..),
);
let x: u32s = x.cast();
let x: f32s = ((u32s::splat(u32::max_value()) - x) << 25 >> 2).into_bits();
sum += x;
}
}
}
self.zero = usize::try_from(zero.wrapping_sum()).unwrap();
self.sum = f64::from(sum.sum());
}
#[cfg(not(all(
feature = "packed_simd",
any(target_arch = "x86", target_arch = "x86_64")
)))]
{
let mut zero = 0;
let mut sum = 0.0;
for (to, from) in self.m.iter_mut().zip(src.m.iter()) {
*to = (*to).max(*from);
zero += if *to == 0 { 1 } else { 0 };
sum += f64::from_bits(u64::max_value().wrapping_sub(u64::from(*to)) << 54 >> 2);
}
self.zero = zero;
self.sum = sum;
}
}
pub fn intersect(&mut self, src: &Self) {
assert_eq!(src.alpha, self.alpha);
assert_eq!(src.p, self.p);
assert_eq!(src.m.len(), self.m.len());
#[cfg(all(
feature = "packed_simd",
any(target_arch = "x86", target_arch = "x86_64")
))]
{
assert_eq!(self.m.len() % u8s::lanes(), 0);
assert_eq!(u8s::lanes(), f32s::lanes() * 4);
assert_eq!(f32s::lanes(), u32s::lanes());
assert_eq!(u8sq::lanes(), u32s::lanes());
let mut zero = u8s_sad_out::splat(0);
let mut sum = f32s::splat(0.0);
for i in (0..self.m.len()).step_by(u8s::lanes()) {
unsafe {
let self_m = u8s::from_slice_unaligned_unchecked(self.m.get_unchecked(i..));
let src_m = u8s::from_slice_unaligned_unchecked(src.m.get_unchecked(i..));
let res = self_m.min(src_m);
res.write_to_slice_unaligned_unchecked(self.m.get_unchecked_mut(i..));
let count: u8s = u8s::splat(0) - u8s::from_bits(res.eq(u8s::splat(0)));
let count2 = Sad::<u8s>::sad(count, u8s::splat(0));
zero += count2;
for j in 0..4 {
let x = u8sq::from_slice_unaligned_unchecked(
self.m.get_unchecked(i + j * u8sq::lanes()..),
);
let x: u32s = x.cast();
let x: f32s = ((u32s::splat(u32::max_value()) - x) << 25 >> 2).into_bits();
sum += x;
}
}
}
self.zero = usize::try_from(zero.wrapping_sum()).unwrap();
self.sum = f64::from(sum.sum());
}
#[cfg(not(all(
feature = "packed_simd",
any(target_arch = "x86", target_arch = "x86_64")
)))]
{
let mut zero = 0;
let mut sum = 0.0;
for (to, from) in self.m.iter_mut().zip(src.m.iter()) {
*to = (*to).min(*from);
zero += if *to == 0 { 1 } else { 0 };
sum += f64::from_bits(u64::max_value().wrapping_sub(u64::from(*to)) << 54 >> 2);
}
self.zero = zero;
self.sum = sum;
}
}
pub fn clear(&mut self) {
self.zero = self.m.len();
self.sum = usize_to_f64(self.m.len());
self.m.iter_mut().for_each(|x| {
*x = 0;
});
}
fn get_threshold(p: u8) -> f64 {
TRESHOLD_DATA[p as usize]
}
fn get_alpha(p: u8) -> f64 {
assert!(4 <= p && p <= 16);
match p {
4 => 0.673,
5 => 0.697,
6 => 0.709,
_ => 0.7213 / (1.0 + 1.079 / u64_to_f64(1_u64 << p)),
}
}
fn get_rho(w: u64, max_width: u8) -> u8 {
let rho = max_width - (64 - u8::try_from(w.leading_zeros()).unwrap()) + 1;
assert!(0 < rho && rho < 65);
rho
}
fn estimate_bias(e: f64, p: u8) -> f64 {
let bias_vector = BIAS_DATA[(p - 4) as usize];
let neighbors = Self::get_nearest_neighbors(e, RAW_ESTIMATE_DATA[(p - 4) as usize]);
assert_eq!(neighbors.len(), 6);
bias_vector[neighbors].iter().sum::<f64>() / 6.0_f64
}
fn get_nearest_neighbors(e: f64, estimate_vector: &[f64]) -> Range<usize> {
let index = estimate_vector
.binary_search_by(|a| a.partial_cmp(&e).unwrap_or(Ordering::Equal))
.unwrap_or_else(identity);
let mut min = if index > 6 { index - 6 } else { 0 };
let mut max = cmp::min(index + 6, estimate_vector.len());
while max - min != 6 {
let (min_val, max_val) = unsafe {
(
*estimate_vector.get_unchecked(min),
*estimate_vector.get_unchecked(max - 1),
)
};
if 2.0 * e - min_val > max_val {
min += 1;
} else {
max -= 1;
}
}
min..max
}
fn ep(&self) -> f64 {
let e = self.alpha * usize_to_f64(self.m.len() * self.m.len()) / self.sum;
if e <= usize_to_f64(5 * self.m.len()) {
e - Self::estimate_bias(e, self.p)
} else {
e
}
}
}
impl<V: ?Sized> Clone for HyperLogLog<V> {
fn clone(&self) -> Self {
Self {
alpha: self.alpha,
zero: self.zero,
sum: self.sum,
p: self.p,
m: self.m.clone(),
marker: PhantomData,
}
}
}
impl<V: ?Sized> fmt::Debug for HyperLogLog<V>
where
V: Hash,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("HyperLogLog")
.field("len", &self.len())
.finish()
}
}
impl<V: ?Sized> New for HyperLogLog<V>
where
V: Hash,
{
type Config = f64;
fn new(config: &Self::Config) -> Self {
Self::new(*config)
}
}
impl<V: ?Sized> Intersect for HyperLogLog<V>
where
V: Hash,
{
fn intersect<'a>(mut iter: impl Iterator<Item = &'a Self>) -> Option<Self>
where
Self: Sized + 'a,
{
let mut ret = iter.next()?.clone();
iter.for_each(|x| {
ret.intersect(x);
});
Some(ret)
}
}
impl<'a, V: ?Sized> UnionAssign<&'a HyperLogLog<V>> for HyperLogLog<V>
where
V: Hash,
{
fn union_assign(&mut self, rhs: &'a Self) {
self.union(rhs)
}
}
impl<'a, V: ?Sized> ops::AddAssign<&'a V> for HyperLogLog<V>
where
V: Hash,
{
fn add_assign(&mut self, rhs: &'a V) {
self.push(rhs)
}
}
impl<'a, V: ?Sized> ops::AddAssign<&'a Self> for HyperLogLog<V>
where
V: Hash,
{
fn add_assign(&mut self, rhs: &'a Self) {
self.union(rhs)
}
}
impl<V: ?Sized> IntersectPlusUnionIsPlus for HyperLogLog<V> {
const VAL: bool = true;
}
#[cfg(all(
feature = "packed_simd",
any(target_arch = "x86", target_arch = "x86_64")
))]
mod simd {
pub use packed_simd::{self, Cast, FromBits, IntoBits};
use std::marker::PhantomData;
#[cfg(target_feature = "avx512bw")]
mod simd_types {
use super::packed_simd;
pub type u8s = packed_simd::u8x64;
pub type u8s_sad_out = packed_simd::u64x8;
pub type f32s = packed_simd::f32x16;
pub type u32s = packed_simd::u32x16;
pub type u8sq = packed_simd::u8x16;
}
#[cfg(target_feature = "avx2")]
mod simd_types {
#![allow(non_camel_case_types)]
use super::packed_simd;
pub type u8s = packed_simd::u8x32;
pub type u8s_sad_out = packed_simd::u64x4;
pub type f32s = packed_simd::f32x8;
pub type u32s = packed_simd::u32x8;
pub type u8sq = packed_simd::u8x8;
}
#[cfg(all(not(target_feature = "avx2"), target_feature = "sse2"))]
mod simd_types {
#![allow(non_camel_case_types)]
use super::packed_simd;
pub type u8s = packed_simd::u8x16;
pub type u8s_sad_out = packed_simd::u64x2;
pub type f32s = packed_simd::f32x4;
pub type u32s = packed_simd::u32x4;
pub type u8sq = packed_simd::u8x4;
}
#[cfg(all(not(target_feature = "avx2"), not(target_feature = "sse2")))]
mod simd_types {
#![allow(non_camel_case_types)]
use super::packed_simd;
pub type u8s = packed_simd::u8x8;
pub type u8s_sad_out = u64;
pub type f32s = packed_simd::f32x2;
pub type u32s = packed_simd::u32x2;
pub type u8sq = packed_simd::u8x2;
}
pub use self::simd_types::{f32s, u32s, u8s, u8s_sad_out, u8sq};
pub struct Sad<X>(PhantomData<fn(X)>);
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
mod x86 {
#[cfg(target_arch = "x86")]
pub use std::arch::x86::*;
#[cfg(target_arch = "x86_64")]
pub use std::arch::x86_64::*;
}
#[cfg(target_feature = "avx2")]
impl Sad<packed_simd::u8x32> {
#[inline]
#[target_feature(enable = "avx2")]
pub unsafe fn sad(a: packed_simd::u8x32, b: packed_simd::u8x32) -> packed_simd::u64x4 {
use std::mem::transmute;
packed_simd::Simd(transmute(x86::_mm256_sad_epu8(
transmute(a.0),
transmute(b.0),
)))
}
}
#[cfg(target_feature = "sse2")]
impl Sad<packed_simd::u8x16> {
#[inline]
#[target_feature(enable = "sse2")]
pub unsafe fn sad(a: packed_simd::u8x16, b: packed_simd::u8x16) -> packed_simd::u64x2 {
use std::mem::transmute;
packed_simd::Simd(transmute(x86::_mm_sad_epu8(transmute(a.0), transmute(b.0))))
}
}
#[cfg(target_feature = "sse,mmx")]
impl Sad<packed_simd::u8x8> {
#[inline]
#[target_feature(enable = "sse,mmx")]
pub unsafe fn sad(a: packed_simd::u8x8, b: packed_simd::u8x8) -> u64 {
use std::mem::transmute;
transmute(x86::_mm_sad_pu8(transmute(a.0), transmute(b.0)))
}
}
#[cfg(not(target_feature = "sse,mmx"))]
impl Sad<packed_simd::u8x8> {
#[inline(always)]
pub unsafe fn sad(a: packed_simd::u8x8, b: packed_simd::u8x8) -> u64 {
assert_eq!(b, packed_simd::u8x8::splat(0));
(0..8).map(|i| u64::from(a.extract(i))).sum()
}
}
}
#[cfg(all(
feature = "packed_simd",
any(target_arch = "x86", target_arch = "x86_64")
))]
use simd::{f32s, u32s, u8s, u8s_sad_out, u8sq, Cast, FromBits, IntoBits, Sad};
#[cfg(test)]
mod test {
use super::{super::f64_to_usize, HyperLogLog};
use std::f64;
#[test]
fn pow_bithack() {
for x in 0_u8..65 {
let a = 2.0_f64.powi(-(i32::from(x)));
let b = f64::from_bits(u64::max_value().wrapping_sub(u64::from(x)) << 54 >> 2);
let c = f32::from_bits(u32::max_value().wrapping_sub(u32::from(x)) << 25 >> 2);
assert_eq!(a, b);
assert_eq!(a, f64::from(c));
}
}
#[test]
fn hyperloglog_test_simple() {
let mut hll = HyperLogLog::new(0.00408);
let keys = ["test1", "test2", "test3", "test2", "test2", "test2"];
for k in &keys {
hll.push(k);
}
assert!((hll.len().round() - 3.0).abs() < f64::EPSILON);
assert!(!hll.is_empty());
hll.clear();
assert!(hll.is_empty());
assert!(hll.len() == 0.0);
}
#[test]
fn hyperloglog_test_merge() {
let mut hll = HyperLogLog::new(0.00408);
let keys = ["test1", "test2", "test3", "test2", "test2", "test2"];
for k in &keys {
hll.push(k);
}
assert!((hll.len().round() - 3.0).abs() < f64::EPSILON);
let mut hll2 = HyperLogLog::new_from(&hll);
let keys2 = ["test3", "test4", "test4", "test4", "test4", "test1"];
for k in &keys2 {
hll2.push(k);
}
assert!((hll2.len().round() - 3.0).abs() < f64::EPSILON);
hll.union(&hll2);
assert!((hll.len().round() - 4.0).abs() < f64::EPSILON);
}
#[test]
#[cfg_attr(miri, ignore)]
fn push() {
let actual = 100_000.0;
let p = 0.05;
let mut hll = HyperLogLog::new(p);
for i in 0..f64_to_usize(actual) {
hll.push(&i);
}
assert!(hll.len() > (actual - (actual * p * 3.0)));
assert!(hll.len() < (actual + (actual * p * 3.0)));
}
}