use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use twox_hash::XxHash64;
const SPARSE_THRESHOLD_DIVISOR: usize = 4;
#[derive(Debug, Clone)]
enum HllRepresentation {
Sparse(BTreeMap<u16, u8>),
Dense(Vec<u8>),
}
#[derive(Debug, Clone)]
pub struct HyperLogLog {
precision: u8,
num_registers: usize,
representation: HllRepresentation,
}
impl HyperLogLog {
pub fn new(precision: u8) -> Self {
assert!((4..=18).contains(&precision), "Precision must be 4-18");
let num_registers = 1 << precision;
Self {
precision,
num_registers,
representation: HllRepresentation::Sparse(BTreeMap::new()),
}
}
pub fn new_dense(precision: u8) -> Self {
assert!((4..=18).contains(&precision), "Precision must be 4-18");
let num_registers = 1 << precision;
Self {
precision,
num_registers,
representation: HllRepresentation::Dense(vec![0; num_registers]),
}
}
pub fn default_precision() -> Self {
Self::new(14)
}
#[inline]
pub fn is_sparse(&self) -> bool {
matches!(self.representation, HllRepresentation::Sparse(_))
}
#[inline]
fn sparse_threshold(&self) -> usize {
self.num_registers / SPARSE_THRESHOLD_DIVISOR
}
fn convert_to_dense(&mut self) {
if let HllRepresentation::Sparse(ref sparse_map) = self.representation {
let mut registers = vec![0u8; self.num_registers];
for (&idx, &rho) in sparse_map.iter() {
registers[idx as usize] = rho;
}
self.representation = HllRepresentation::Dense(registers);
}
}
#[inline]
fn update_register(&mut self, register_idx: usize, rho: u8) {
match &mut self.representation {
HllRepresentation::Sparse(sparse_map) => {
let idx = register_idx as u16;
let current = sparse_map.get(&idx).copied().unwrap_or(0);
if rho > current {
sparse_map.insert(idx, rho);
}
if sparse_map.len() > self.sparse_threshold() {
self.convert_to_dense();
}
}
HllRepresentation::Dense(registers) => {
registers[register_idx] = registers[register_idx].max(rho);
}
}
}
#[inline]
#[allow(dead_code)]
fn get_register(&self, register_idx: usize) -> u8 {
match &self.representation {
HllRepresentation::Sparse(sparse_map) => {
sparse_map.get(&(register_idx as u16)).copied().unwrap_or(0)
}
HllRepresentation::Dense(registers) => registers[register_idx],
}
}
#[inline]
fn hash<T: Hash>(&self, item: &T) -> u64 {
let mut hasher = XxHash64::default();
item.hash(&mut hasher);
hasher.finish()
}
#[inline]
pub fn add<T: Hash>(&mut self, item: &T) {
let hash = self.hash(item);
let register_idx = (hash >> (64 - self.precision)) as usize;
let remaining = hash << self.precision;
let rho = if remaining == 0 {
64 - self.precision + 1
} else {
remaining.leading_zeros() as u8 + 1
};
self.update_register(register_idx, rho);
}
#[inline]
pub fn add_hash(&mut self, hash: u64) {
let register_idx = (hash >> (64 - self.precision)) as usize;
let remaining = hash << self.precision;
let rho = if remaining == 0 {
64 - self.precision + 1
} else {
remaining.leading_zeros() as u8 + 1
};
self.update_register(register_idx, rho);
}
pub fn cardinality(&self) -> u64 {
let m = self.num_registers as f64;
let alpha_m = match self.precision {
4 => 0.673,
5 => 0.697,
6 => 0.709,
_ => 0.7213 / (1.0 + 1.079 / m),
};
let (sum, zeros) = self.compute_harmonic_sum_and_zeros();
let raw_estimate = alpha_m * m * m / sum;
let estimate = if raw_estimate <= 5.0 * m {
let bias = self.estimate_bias(raw_estimate);
let corrected = raw_estimate - bias;
if zeros > 0.0 {
let linear_estimate = m * (m / zeros).ln();
if linear_estimate <= Self::linear_counting_threshold(self.precision) {
return linear_estimate as u64;
}
}
corrected
} else {
raw_estimate
};
if estimate > (1u64 << 32) as f64 / 30.0 {
let two_to_32 = (1u64 << 32) as f64;
return (-two_to_32 * (1.0 - estimate / two_to_32).ln()) as u64;
}
estimate.max(0.0) as u64
}
#[inline]
fn compute_harmonic_sum_and_zeros(&self) -> (f64, f64) {
match &self.representation {
HllRepresentation::Sparse(sparse_map) => {
let mut sum = 0.0_f64;
for &rho in sparse_map.values() {
sum += 2.0_f64.powi(-(rho as i32));
}
let zeros = (self.num_registers - sparse_map.len()) as f64;
sum += zeros; (sum, zeros)
}
HllRepresentation::Dense(registers) => {
let mut sum = 0.0_f64;
let mut zeros = 0.0_f64;
for &r in registers.iter() {
sum += 2.0_f64.powi(-(r as i32));
if r == 0 {
zeros += 1.0;
}
}
(sum, zeros)
}
}
}
#[inline]
fn estimate_bias(&self, raw_estimate: f64) -> f64 {
let m = self.num_registers as f64;
if raw_estimate < 0.5 * m {
0.7 * m * (0.5 * m / raw_estimate).min(1.0)
} else if raw_estimate < 2.5 * m {
0.2 * m * (2.5 * m - raw_estimate) / (2.0 * m)
} else {
0.0
}
}
#[inline]
fn linear_counting_threshold(precision: u8) -> f64 {
match precision {
4 => 10.0,
5 => 20.0,
6 => 40.0,
7 => 80.0,
8 => 220.0,
9 => 400.0,
10 => 900.0,
11 => 1800.0,
12 => 3100.0,
13 => 6500.0,
14 => 11500.0,
15 => 20000.0,
16 => 50000.0,
17 => 120000.0,
18 => 350000.0,
_ => 11500.0, }
}
pub fn merge(&mut self, other: &HyperLogLog) {
assert_eq!(self.precision, other.precision, "Precision mismatch");
match (&mut self.representation, &other.representation) {
(HllRepresentation::Sparse(self_map), HllRepresentation::Sparse(other_map)) => {
for (&idx, &rho) in other_map.iter() {
let current = self_map.get(&idx).copied().unwrap_or(0);
if rho > current {
self_map.insert(idx, rho);
}
}
if self_map.len() > self.num_registers / SPARSE_THRESHOLD_DIVISOR {
self.convert_to_dense();
}
}
(HllRepresentation::Dense(self_regs), HllRepresentation::Dense(other_regs)) => {
for (i, &r) in other_regs.iter().enumerate() {
self_regs[i] = self_regs[i].max(r);
}
}
(HllRepresentation::Sparse(_), HllRepresentation::Dense(other_regs)) => {
self.convert_to_dense();
if let HllRepresentation::Dense(self_regs) = &mut self.representation {
for (i, &r) in other_regs.iter().enumerate() {
self_regs[i] = self_regs[i].max(r);
}
}
}
(HllRepresentation::Dense(self_regs), HllRepresentation::Sparse(other_map)) => {
for (&idx, &rho) in other_map.iter() {
let i = idx as usize;
self_regs[i] = self_regs[i].max(rho);
}
}
}
}
pub fn is_empty(&self) -> bool {
match &self.representation {
HllRepresentation::Sparse(sparse_map) => sparse_map.is_empty(),
HllRepresentation::Dense(registers) => registers.iter().all(|&r| r == 0),
}
}
pub fn clear(&mut self) {
self.representation = HllRepresentation::Sparse(BTreeMap::new());
}
pub fn memory_usage(&self) -> usize {
std::mem::size_of::<Self>()
+ match &self.representation {
HllRepresentation::Sparse(sparse_map) => {
sparse_map.len() * (std::mem::size_of::<u16>() + std::mem::size_of::<u8>())
}
HllRepresentation::Dense(registers) => registers.len(),
}
}
pub fn precision(&self) -> u8 {
self.precision
}
pub fn standard_error(&self) -> f64 {
1.04 / (self.num_registers as f64).sqrt() * 100.0
}
pub fn non_zero_count(&self) -> usize {
match &self.representation {
HllRepresentation::Sparse(sparse_map) => sparse_map.len(),
HllRepresentation::Dense(registers) => registers.iter().filter(|&&r| r != 0).count(),
}
}
}
impl Default for HyperLogLog {
fn default() -> Self {
Self::default_precision()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_cardinality() {
let mut hll = HyperLogLog::new(14);
for i in 0..1000u64 {
hll.add(&i);
}
let estimate = hll.cardinality();
let error = (estimate as f64 - 1000.0).abs() / 1000.0;
assert!(error < 0.05, "Error was {}%", error * 100.0);
}
#[test]
fn test_duplicates() {
let mut hll = HyperLogLog::new(14);
for _ in 0..1000 {
hll.add(&42u64);
}
let estimate = hll.cardinality();
assert!(estimate <= 2, "Estimate was {}", estimate);
}
#[test]
fn test_merge() {
let mut hll1 = HyperLogLog::new(14);
let mut hll2 = HyperLogLog::new(14);
for i in 0..500u64 {
hll1.add(&i);
}
for i in 500..1000u64 {
hll2.add(&i);
}
hll1.merge(&hll2);
let estimate = hll1.cardinality();
let error = (estimate as f64 - 1000.0).abs() / 1000.0;
assert!(error < 0.05, "Error was {}%", error * 100.0);
}
#[test]
fn test_string_items() {
let mut hll = HyperLogLog::new(14);
for i in 0..1000 {
hll.add(&format!("session_{}", i));
}
let estimate = hll.cardinality();
let error = (estimate as f64 - 1000.0).abs() / 1000.0;
assert!(error < 0.05, "Error was {}%", error * 100.0);
}
#[test]
fn test_sparse_mode_small_cardinality() {
let mut hll = HyperLogLog::new(14);
for i in 0..100u64 {
hll.add(&i);
}
assert!(
hll.is_sparse(),
"Should remain in sparse mode for 100 items"
);
let memory = hll.memory_usage();
assert!(
memory < 1000,
"Sparse mode memory should be < 1KB, was {}",
memory
);
let estimate = hll.cardinality();
let error = (estimate as f64 - 100.0).abs() / 100.0;
assert!(error < 0.10, "Error was {}%", error * 100.0);
}
#[test]
fn test_auto_conversion_to_dense() {
let mut hll = HyperLogLog::new(10);
assert!(hll.is_sparse(), "Should start in sparse mode");
for i in 0..400u64 {
hll.add(&i);
}
assert!(
!hll.is_sparse(),
"Should convert to dense mode after threshold"
);
}
#[test]
fn test_new_dense_constructor() {
let hll = HyperLogLog::new_dense(14);
assert!(!hll.is_sparse(), "new_dense should create dense HLL");
}
#[test]
fn test_merge_sparse_sparse() {
let mut hll1 = HyperLogLog::new(14);
let mut hll2 = HyperLogLog::new(14);
for i in 0..50u64 {
hll1.add(&i);
}
for i in 50..100u64 {
hll2.add(&i);
}
assert!(hll1.is_sparse());
assert!(hll2.is_sparse());
hll1.merge(&hll2);
assert!(hll1.is_sparse());
let estimate = hll1.cardinality();
let error = (estimate as f64 - 100.0).abs() / 100.0;
assert!(error < 0.15, "Error was {}%", error * 100.0);
}
#[test]
fn test_merge_dense_sparse() {
let mut hll_dense = HyperLogLog::new_dense(14);
let mut hll_sparse = HyperLogLog::new(14);
for i in 0..500u64 {
hll_dense.add(&i);
}
for i in 500..600u64 {
hll_sparse.add(&i);
}
assert!(!hll_dense.is_sparse());
assert!(hll_sparse.is_sparse());
hll_dense.merge(&hll_sparse);
let estimate = hll_dense.cardinality();
let error = (estimate as f64 - 600.0).abs() / 600.0;
assert!(error < 0.05, "Error was {}%", error * 100.0);
}
#[test]
fn test_clear_resets_to_sparse() {
let mut hll = HyperLogLog::new_dense(14);
for i in 0..1000u64 {
hll.add(&i);
}
assert!(!hll.is_sparse());
hll.clear();
assert!(hll.is_sparse(), "Clear should reset to sparse mode");
assert!(hll.is_empty());
}
}