use std::hash::{DefaultHasher, Hash, Hasher};
use crate::math;
use crate::{DeserializeError, FORMAT_VERSION, MAGIC, MergeError};
use crate::{MAX_P, MIN_P, T};
const D: u32 = 20;
const REGISTER_MASK: u32 = (1u32 << 28) - 1;
const HEADER_LEN: usize = 8;
const BYTES_PER_PAIR: usize = 7;
const FORMAT_FLAG_SPARSE: u8 = 0x80;
#[derive(Clone, Debug)]
pub struct ExaLogLog {
p: u32,
storage: Storage,
martingale: f64,
mu: f64,
martingale_invalid: bool,
}
#[derive(Clone, Debug)]
enum Storage {
Sparse(Vec<u32>),
Dense(Box<[u8]>),
}
fn sparse_capacity(p: u32) -> usize {
((1usize << p) * 7) / 8
}
const MAX_P_SPARSE: u32 = math::SPARSE_V - T;
fn dense_storage_bytes(p: u32) -> usize {
((1usize << p) / 2) * BYTES_PER_PAIR
}
impl ExaLogLog {
pub fn new(p: u32) -> Self {
assert!(
(MIN_P..=MAX_P).contains(&p),
"precision p={p} out of range [{MIN_P}, {MAX_P}]"
);
if p > MAX_P_SPARSE {
return Self::new_dense(p);
}
Self {
p,
storage: Storage::Sparse(Vec::new()),
martingale: 0.0,
mu: 1.0,
martingale_invalid: true, }
}
pub fn with_target_rmse(target_rmse: f64) -> Self {
assert!(
target_rmse.is_finite() && target_rmse > 0.0,
"with_target_rmse: target must be finite and positive, got {target_rmse}"
);
const MVP: f64 = 3.67;
const BITS_PER_REGISTER: f64 = 28.0;
let m_needed = MVP / (BITS_PER_REGISTER * target_rmse * target_rmse);
let p = (m_needed.log2().ceil() as i32).clamp(MIN_P as i32, MAX_P as i32) as u32;
Self::new(p)
}
pub fn new_dense(p: u32) -> Self {
assert!(
(MIN_P..=MAX_P).contains(&p),
"precision p={p} out of range [{MIN_P}, {MAX_P}]"
);
Self {
p,
storage: Storage::Dense(vec![0u8; dense_storage_bytes(p)].into_boxed_slice()),
martingale: 0.0,
mu: 1.0,
martingale_invalid: false,
}
}
#[must_use]
pub fn precision(&self) -> u32 {
self.p
}
#[must_use]
pub fn num_registers(&self) -> usize {
1 << self.p
}
#[must_use]
pub fn register_bytes(&self) -> usize {
match &self.storage {
Storage::Sparse(tokens) => tokens.capacity() * 4,
Storage::Dense(b) => b.len(),
}
}
#[must_use]
pub fn is_sparse(&self) -> bool {
matches!(self.storage, Storage::Sparse(_))
}
#[must_use]
pub fn d_parameter() -> u32 {
D
}
#[inline]
#[must_use]
pub fn get_register(&self, i: usize) -> u32 {
assert!(
i < self.num_registers(),
"register index {i} out of range [0, {})",
self.num_registers()
);
match &self.storage {
Storage::Sparse(tokens) => {
let mut r = 0u32;
for &tok in tokens {
let h = math::token_to_hash(tok);
let (idx, k) = math::hash_to_register_k(h, self.p);
if idx == i {
r = math::apply_insert(r, k, D);
}
}
r
}
Storage::Dense(_) => self.read_dense_register(i),
}
}
fn read_dense_register(&self, i: usize) -> u32 {
let storage = match &self.storage {
Storage::Dense(b) => b,
Storage::Sparse(_) => unreachable!("read_dense_register called in sparse mode"),
};
let chunk_off = (i >> 1) * BYTES_PER_PAIR;
if i & 1 == 0 {
let bytes = [
storage[chunk_off],
storage[chunk_off + 1],
storage[chunk_off + 2],
storage[chunk_off + 3],
];
u32::from_le_bytes(bytes) & REGISTER_MASK
} else {
let bytes = [
storage[chunk_off + 3],
storage[chunk_off + 4],
storage[chunk_off + 5],
storage[chunk_off + 6],
];
u32::from_le_bytes(bytes) >> 4
}
}
fn write_dense_register(&mut self, i: usize, value: u32) {
debug_assert!(i < self.num_registers());
debug_assert!(
value <= REGISTER_MASK,
"register value {value:#x} exceeds 28 bits"
);
let storage = match &mut self.storage {
Storage::Dense(b) => b,
Storage::Sparse(_) => unreachable!("write_dense_register called in sparse mode"),
};
let chunk_off = (i >> 1) * BYTES_PER_PAIR;
let v = value & REGISTER_MASK;
if i & 1 == 0 {
storage[chunk_off] = v as u8;
storage[chunk_off + 1] = (v >> 8) as u8;
storage[chunk_off + 2] = (v >> 16) as u8;
let high4 = storage[chunk_off + 3] & 0xF0;
storage[chunk_off + 3] = high4 | ((v >> 24) as u8 & 0x0F);
} else {
let low4 = storage[chunk_off + 3] & 0x0F;
storage[chunk_off + 3] = low4 | ((v << 4) as u8 & 0xF0);
storage[chunk_off + 4] = (v >> 4) as u8;
storage[chunk_off + 5] = (v >> 12) as u8;
storage[chunk_off + 6] = (v >> 20) as u8;
}
}
pub fn densify(&mut self) {
if matches!(self.storage, Storage::Dense(_)) {
return;
}
let tokens = match std::mem::replace(&mut self.storage, Storage::Sparse(Vec::new())) {
Storage::Sparse(t) => t,
Storage::Dense(_) => unreachable!(),
};
self.storage = Storage::Dense(vec![0u8; dense_storage_bytes(self.p)].into_boxed_slice());
for tok in tokens {
let h = math::token_to_hash(tok);
let (i, k) = math::hash_to_register_k(h, self.p);
let r = self.read_dense_register(i);
let new_r = math::apply_insert(r, k, D);
if new_r != r {
self.write_dense_register(i, new_r);
}
}
self.martingale_invalid = true;
self.martingale = f64::NAN;
self.mu = f64::NAN;
}
pub fn add_hash(&mut self, hash: u64) {
match &mut self.storage {
Storage::Sparse(tokens) => {
let token = math::hash_to_token(hash);
if let Err(idx) = tokens.binary_search(&token) {
tokens.insert(idx, token);
if tokens.len() > sparse_capacity(self.p) {
self.densify();
}
}
}
Storage::Dense(_) => {
let (i, k) = math::hash_to_register_k(hash, self.p);
let r = self.read_dense_register(i);
let new_r = math::apply_insert(r, k, D);
if new_r != r {
if !self.martingale_invalid {
self.martingale += 1.0 / self.mu;
self.mu -= math::h(r, self.p, D) - math::h(new_r, self.p, D);
if self.mu < 1e-300 {
self.mu = 1e-300;
}
}
self.write_dense_register(i, new_r);
}
}
}
}
pub fn add_hashes(&mut self, hashes: &[u64]) {
match &mut self.storage {
Storage::Sparse(tokens) => {
tokens.reserve(hashes.len());
for &h in hashes {
tokens.push(math::hash_to_token(h));
}
tokens.sort_unstable();
tokens.dedup();
if tokens.len() > sparse_capacity(self.p) {
self.densify();
}
}
Storage::Dense(_) => {
for &h in hashes {
self.add_hash(h);
}
}
}
}
pub fn add_hashes_sorted(&mut self, hashes: &[u64]) {
if hashes.is_empty() {
return;
}
self.densify();
let p = self.p;
let mut iks: Vec<(u32, u32)> = Vec::with_capacity(hashes.len());
#[cfg(all(target_arch = "x86_64", feature = "simd"))]
crate::simd_x86::fill_iks(hashes, p, &mut iks);
#[cfg(not(all(target_arch = "x86_64", feature = "simd")))]
math::fill_iks(hashes, p, &mut iks);
math::counting_sort_by_register(&mut iks, 1usize << p);
let mut idx = 0;
while idx < iks.len() {
let i = iks[idx].0 as usize;
let r_start = self.read_dense_register(i);
let mut r = r_start;
while idx < iks.len() && iks[idx].0 as usize == i {
r = math::apply_insert(r, iks[idx].1, D);
idx += 1;
}
if r != r_start {
self.write_dense_register(i, r);
}
}
self.martingale_invalid = true;
}
pub fn add<H: Hash + ?Sized>(&mut self, item: &H) {
let mut hasher = DefaultHasher::new();
item.hash(&mut hasher);
self.add_hash(hasher.finish());
}
#[must_use]
pub fn estimate(&self) -> f64 {
self.estimate_ml()
}
#[must_use]
pub fn estimate_ml(&self) -> f64 {
match &self.storage {
Storage::Sparse(tokens) => math::estimate_from_tokens(tokens),
Storage::Dense(_) => {
let regs = (0..self.num_registers()).map(|i| self.read_dense_register(i));
let (alpha, beta) = math::compute_alpha_beta(regs, self.p, D);
math::solve_ml(alpha, &beta, self.p)
}
}
}
#[must_use]
pub fn estimate_martingale(&self) -> Option<f64> {
if self.martingale_invalid {
None
} else {
Some(self.martingale)
}
}
pub fn merge(&mut self, other: &Self) -> Result<(), MergeError> {
if self.p != other.p {
return Err(MergeError::PrecisionMismatch {
lhs: self.p,
rhs: other.p,
});
}
match (&mut self.storage, &other.storage) {
(Storage::Sparse(self_tokens), Storage::Sparse(other_tokens)) => {
let mut merged = Vec::with_capacity(self_tokens.len() + other_tokens.len());
let mut a = self_tokens.iter().copied().peekable();
let mut b = other_tokens.iter().copied().peekable();
loop {
match (a.peek().copied(), b.peek().copied()) {
(Some(x), Some(y)) if x < y => {
merged.push(x);
a.next();
}
(Some(x), Some(y)) if x > y => {
merged.push(y);
b.next();
}
(Some(x), Some(_)) => {
merged.push(x);
a.next();
b.next();
}
(Some(x), None) => {
merged.push(x);
a.next();
}
(None, Some(y)) => {
merged.push(y);
b.next();
}
(None, None) => break,
}
}
if merged.len() > sparse_capacity(self.p) {
self.storage = Storage::Sparse(merged);
self.densify();
} else {
self.storage = Storage::Sparse(merged);
}
}
_ => {
self.densify();
match &other.storage {
Storage::Sparse(other_tokens) => {
for &tok in other_tokens {
let h = math::token_to_hash(tok);
let (i, k) = math::hash_to_register_k(h, self.p);
let r = self.read_dense_register(i);
let new_r = math::apply_insert(r, k, D);
if new_r != r {
self.write_dense_register(i, new_r);
}
}
}
Storage::Dense(_) => {
for i in 0..self.num_registers() {
let merged = math::merge_register(
self.read_dense_register(i),
other.read_dense_register(i),
D,
);
if merged != self.read_dense_register(i) {
self.write_dense_register(i, merged);
}
}
}
}
self.martingale_invalid = true;
self.martingale = f64::NAN;
self.mu = f64::NAN;
}
}
Ok(())
}
pub fn merge_iter<'a, I>(&mut self, sketches: I) -> Result<(), MergeError>
where
I: IntoIterator<Item = &'a Self>,
{
for s in sketches {
self.merge(s)?;
}
Ok(())
}
pub fn reduce(&self, new_p: u32) -> Self {
assert!(
(MIN_P..=MAX_P).contains(&new_p) && new_p <= self.p,
"new_p={new_p} must be in [{MIN_P}, {self_p}]",
self_p = self.p
);
if let Storage::Sparse(tokens) = &self.storage {
let mut out = Self::new(new_p);
for &tok in tokens {
out.add_hash(math::token_to_hash(tok));
}
return out;
}
let mut out = Self::new_dense(new_p);
if new_p == self.p {
for i in 0..self.num_registers() {
out.write_dense_register(i, self.read_dense_register(i));
}
out.martingale_invalid = true;
return out;
}
let p_diff = self.p - new_p;
let m_new = 1usize << new_p;
let two_t = 1u32 << T;
let a = (64 - T - self.p) * two_t + 1;
for new_i in 0..m_new {
let mut acc = 0u32;
for j in 0..(1u64 << p_diff) {
let old_i = new_i + m_new * j as usize;
let mut r = self.read_dense_register(old_i);
let u = r >> D;
if u >= a {
let bit_len_j = if j == 0 { 0 } else { 64 - j.leading_zeros() };
let s = (p_diff - bit_len_j) * two_t;
if s > 0 {
let v = D + a - u;
if v > 0 {
let high = (r >> v) << v;
let low_v = r & ((1u32 << v) - 1);
let low_v_shifted = low_v >> s;
r = high | low_v_shifted;
}
r += s << D;
}
}
acc = math::merge_register(acc, r, D);
}
out.write_dense_register(new_i, acc & REGISTER_MASK);
}
out.martingale_invalid = true;
out
}
#[must_use]
pub fn is_empty(&self) -> bool {
match &self.storage {
Storage::Sparse(tokens) => tokens.is_empty(),
Storage::Dense(s) => s.iter().all(|&b| b == 0),
}
}
pub fn clear(&mut self) {
self.storage = Storage::Sparse(Vec::new());
self.martingale = 0.0;
self.mu = 1.0;
self.martingale_invalid = true;
}
pub fn to_bytes(&self) -> Vec<u8> {
match &self.storage {
Storage::Dense(s) => {
let mut out = Vec::with_capacity(HEADER_LEN + s.len());
out.extend_from_slice(&MAGIC);
out.push(FORMAT_VERSION);
out.push(T as u8);
out.push(D as u8);
out.push(self.p as u8);
out.extend_from_slice(s);
out
}
Storage::Sparse(tokens) => {
let mut out = Vec::with_capacity(HEADER_LEN + 4 + tokens.len() * 4);
out.extend_from_slice(&MAGIC);
out.push(FORMAT_VERSION | FORMAT_FLAG_SPARSE);
out.push(T as u8);
out.push(D as u8);
out.push(self.p as u8);
out.extend_from_slice(&(tokens.len() as u32).to_le_bytes());
for &tok in tokens {
out.extend_from_slice(&tok.to_le_bytes());
}
out
}
}
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, DeserializeError> {
if bytes.len() < HEADER_LEN {
return Err(DeserializeError::TooShort {
got: bytes.len(),
need: HEADER_LEN,
});
}
if bytes[0..4] != MAGIC {
return Err(DeserializeError::BadMagic);
}
let raw_version = bytes[4];
let is_sparse = raw_version & FORMAT_FLAG_SPARSE != 0;
let version = raw_version & !FORMAT_FLAG_SPARSE;
if version != FORMAT_VERSION {
return Err(DeserializeError::UnsupportedVersion(raw_version));
}
let t = bytes[5];
let d = bytes[6];
if u32::from(t) != T || u32::from(d) != D {
return Err(DeserializeError::ParameterMismatch { t, d });
}
let p = bytes[7];
if !(MIN_P..=MAX_P).contains(&u32::from(p)) {
return Err(DeserializeError::InvalidPrecision(p));
}
if is_sparse {
if bytes.len() < HEADER_LEN + 4 {
return Err(DeserializeError::TooShort {
got: bytes.len(),
need: HEADER_LEN + 4,
});
}
let count =
u32::from_le_bytes(bytes[HEADER_LEN..HEADER_LEN + 4].try_into().unwrap()) as usize;
let expected = HEADER_LEN + 4 + count * 4;
if bytes.len() != expected {
return Err(DeserializeError::LengthMismatch {
got: bytes.len(),
expected,
});
}
let mut tokens = Vec::with_capacity(count);
for i in 0..count {
let off = HEADER_LEN + 4 + i * 4;
tokens.push(u32::from_le_bytes(bytes[off..off + 4].try_into().unwrap()));
}
tokens.sort_unstable();
tokens.dedup();
return Ok(Self {
p: u32::from(p),
storage: Storage::Sparse(tokens),
martingale: f64::NAN,
mu: f64::NAN,
martingale_invalid: true,
});
}
let m = 1usize << p;
let expected_len = HEADER_LEN + (m / 2) * BYTES_PER_PAIR;
if bytes.len() != expected_len {
return Err(DeserializeError::LengthMismatch {
got: bytes.len(),
expected: expected_len,
});
}
let storage: Box<[u8]> = bytes[HEADER_LEN..].to_vec().into_boxed_slice();
Ok(Self {
p: u32::from(p),
storage: Storage::Dense(storage),
martingale: f64::NAN,
mu: f64::NAN,
martingale_invalid: true,
})
}
}
impl PartialEq for ExaLogLog {
fn eq(&self, other: &Self) -> bool {
if self.p != other.p {
return false;
}
let m = 1usize << self.p;
(0..m).all(|i| self.get_register(i) == other.get_register(i))
}
}
impl Eq for ExaLogLog {}
impl Extend<u64> for ExaLogLog {
fn extend<I: IntoIterator<Item = u64>>(&mut self, iter: I) {
let hashes: Vec<u64> = iter.into_iter().collect();
self.add_hashes(&hashes);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn splitmix64(mut x: u64) -> u64 {
x = x.wrapping_add(0x9E37_79B9_7F4A_7C15);
x = (x ^ (x >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
x = (x ^ (x >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
x ^ (x >> 31)
}
#[test]
fn pack_unpack_roundtrip_for_each_register() {
let p = 6;
let m = 1usize << p;
let mut s = ExaLogLog::new_dense(p);
for i in 0..m {
let v = ((0xA5A5A5u32.wrapping_mul(i as u32 + 1)) ^ 0x0DEADBu32) & REGISTER_MASK;
s.write_dense_register(i, v);
}
for i in 0..m {
let v = ((0xA5A5A5u32.wrapping_mul(i as u32 + 1)) ^ 0x0DEADBu32) & REGISTER_MASK;
assert_eq!(s.get_register(i), v);
}
}
#[test]
fn writing_register_does_not_disturb_neighbors() {
let p = 8;
let mut s = ExaLogLog::new_dense(p);
for i in 0..s.num_registers() {
s.write_dense_register(i, REGISTER_MASK);
}
for i in (0..s.num_registers()).step_by(2) {
s.write_dense_register(i, 0);
}
for i in 0..s.num_registers() {
let expected = if i % 2 == 0 { 0 } else { REGISTER_MASK };
assert_eq!(s.get_register(i), expected, "register {i}");
}
}
#[test]
fn empty_sketch_estimates_zero() {
let s = ExaLogLog::new(12);
assert_eq!(s.estimate(), 0.0);
assert!(s.is_sparse());
}
#[test]
fn sparse_mode_is_exact_for_small_n() {
for &n in &[1u64, 5, 10, 50, 100, 500] {
let mut s = ExaLogLog::new(12);
for i in 0..n {
s.add_hash(splitmix64(i));
}
assert!(s.is_sparse(), "expected sparse mode at n={n}");
let est = s.estimate_ml();
let rel_err = (est - n as f64).abs() / n as f64;
assert!(
rel_err < 0.20,
"sparse n={n} estimate={est}, rel_err={rel_err}"
);
}
}
#[test]
fn auto_promotes_at_break_even() {
let p = 8;
let break_even = sparse_capacity(p);
let mut s = ExaLogLog::new(p);
for i in 0..(break_even as u64 + 100) {
s.add_hash(splitmix64(i));
}
assert!(!s.is_sparse(), "should have promoted to dense");
}
#[test]
fn ml_estimate_within_error_bounds_after_promotion() {
let p = 12;
for &n in &[10_000u64, 100_000, 1_000_000] {
let mut s = ExaLogLog::new(p);
for i in 0..n {
s.add_hash(splitmix64(i));
}
assert!(!s.is_sparse(), "n={n} should have promoted");
let est = s.estimate_ml();
let rel_err = (est - n as f64).abs() / n as f64;
assert!(rel_err < 0.05, "n={n}: est={est}, rel_err={rel_err}");
}
}
#[test]
fn idempotent_inserts_in_sparse_mode_dont_grow_storage() {
let mut s = ExaLogLog::new(12);
for _ in 0..1000 {
s.add_hash(0xDEAD_BEEF_CAFE_BABE);
}
assert!(s.is_sparse());
if let Storage::Sparse(tokens) = &s.storage {
assert_eq!(tokens.len(), 1);
}
}
#[test]
fn dense_directly_built_matches_sparse_then_dense() {
let p = 8;
let break_even = sparse_capacity(p);
let n = (break_even as u64 + 50) * 3;
let mut sparse_then_dense = ExaLogLog::new(p);
let mut dense_only = ExaLogLog::new_dense(p);
for i in 0..n {
let h = splitmix64(i);
sparse_then_dense.add_hash(h);
dense_only.add_hash(h);
}
assert!(!sparse_then_dense.is_sparse());
for i in 0..(1usize << p) {
assert_eq!(
sparse_then_dense.get_register(i),
dense_only.get_register(i),
"register {i}"
);
}
}
#[test]
fn merge_sparse_with_sparse_unions_tokens() {
let p = 12;
let mut a = ExaLogLog::new(p);
let mut b = ExaLogLog::new(p);
for i in 0..50u64 {
a.add_hash(splitmix64(i));
}
for i in 30..80u64 {
b.add_hash(splitmix64(i));
}
a.merge(&b).unwrap();
let est = a.estimate_ml();
let rel_err = (est - 80.0).abs() / 80.0;
assert!(rel_err < 0.05, "merged sparse estimate = {est}");
}
#[test]
fn merge_sparse_with_dense_yields_dense() {
let p = 8;
let mut sparse = ExaLogLog::new(p);
let mut dense = ExaLogLog::new(p);
for i in 0..50u64 {
sparse.add_hash(splitmix64(i));
}
for i in 0..(sparse_capacity(p) as u64 + 100) {
dense.add_hash(splitmix64(i + 1_000_000));
}
assert!(!dense.is_sparse());
sparse.merge(&dense).unwrap();
assert!(!sparse.is_sparse());
}
#[test]
fn serialize_roundtrip_sparse() {
let p = 12;
let mut s = ExaLogLog::new(p);
for i in 0..50u64 {
s.add_hash(splitmix64(i));
}
let est = s.estimate_ml();
let bytes = s.to_bytes();
let restored = ExaLogLog::from_bytes(&bytes).unwrap();
assert!(restored.is_sparse());
assert!((restored.estimate_ml() - est).abs() < 1e-6);
}
#[test]
fn serialize_roundtrip_dense() {
let p = 12;
let mut s = ExaLogLog::new_dense(p);
for i in 0..50_000u64 {
s.add_hash(splitmix64(i));
}
let est = s.estimate_ml();
let bytes = s.to_bytes();
let expected_size = 8 + (1 << p) / 2 * 7;
assert_eq!(bytes.len(), expected_size);
let restored = ExaLogLog::from_bytes(&bytes).unwrap();
assert!(!restored.is_sparse());
assert!((restored.estimate_ml() - est).abs() < 1e-6);
}
#[test]
fn token_round_trip_preserves_registers_for_p_le_v_minus_t() {
for p in [3u32, 8, 12, 18, 24] {
for i in 0..1000u64 {
let h = splitmix64(i);
let tok = math::hash_to_token(h);
let h_recon = math::token_to_hash(tok);
let (i_orig, k_orig) = math::hash_to_register_k(h, p);
let (i_recon, k_recon) = math::hash_to_register_k(h_recon, p);
assert_eq!(i_orig, i_recon, "p={p}, i={i}: index mismatch");
assert_eq!(k_orig, k_recon, "p={p}, i={i}: k mismatch");
}
}
}
#[test]
fn reduce_to_same_p_returns_same_state() {
let p = 10;
let mut s = ExaLogLog::new_dense(p);
for i in 0..10_000u64 {
s.add_hash(splitmix64(i));
}
let r = s.reduce(p);
for i in 0..s.num_registers() {
assert_eq!(r.get_register(i), s.get_register(i));
}
}
#[test]
fn reduce_preserves_estimate_within_tolerance() {
let p_high = 12;
let p_low = 10;
let n = 50_000u64;
let mut a = ExaLogLog::new_dense(p_high);
let mut direct = ExaLogLog::new_dense(p_low);
for i in 0..n {
let h = splitmix64(i);
a.add_hash(h);
direct.add_hash(h);
}
let reduced = a.reduce(p_low);
let red_est = reduced.estimate_ml();
let dir_est = direct.estimate_ml();
let rel_diff = (red_est - dir_est).abs() / n as f64;
assert!(rel_diff < 0.10, "reduced={red_est}, direct={dir_est}");
}
#[test]
fn add_hashes_matches_individual_inserts() {
for p in [8u32, 12] {
let n = 50_000u64;
let mut serial = ExaLogLog::new(p);
let mut batched = ExaLogLog::new(p);
let hashes: Vec<u64> = (0..n).map(splitmix64).collect();
for &h in &hashes {
serial.add_hash(h);
}
batched.add_hashes(&hashes);
for i in 0..serial.num_registers() {
assert_eq!(
serial.get_register(i),
batched.get_register(i),
"p={p} register {i} differs"
);
}
assert!((serial.estimate_ml() - batched.estimate_ml()).abs() < 1e-6);
}
}
#[test]
fn with_target_rmse_picks_appropriate_p() {
let s_2pct = ExaLogLog::with_target_rmse(0.02);
assert!(s_2pct.precision() >= 8);
let s_1pct = ExaLogLog::with_target_rmse(0.01);
assert!(s_1pct.precision() > s_2pct.precision());
let s_0_5pct = ExaLogLog::with_target_rmse(0.005);
assert!(s_0_5pct.precision() > s_1pct.precision());
}
#[test]
fn add_hashes_sorted_matches_serial() {
let p = 12;
let n = 100_000u64;
let mut serial = ExaLogLog::new_dense(p);
let mut sorted = ExaLogLog::new_dense(p);
let hashes: Vec<u64> = (0..n).map(splitmix64).collect();
for &h in &hashes {
serial.add_hash(h);
}
sorted.add_hashes_sorted(&hashes);
for i in 0..serial.num_registers() {
assert_eq!(
serial.get_register(i),
sorted.get_register(i),
"register {i}"
);
}
}
#[test]
fn merge_iter_matches_repeated_merge() {
let p = 10;
let mut targets: Vec<ExaLogLog> = (0..5)
.map(|tid| {
let mut s = ExaLogLog::new_dense(p);
for i in (tid * 1000)..((tid + 1) * 1000) {
s.add_hash(splitmix64(i));
}
s
})
.collect();
let head = targets.remove(0);
let mut a = head.clone();
for s in &targets {
a.merge(s).unwrap();
}
let mut b = head;
b.merge_iter(targets.iter()).unwrap();
for i in 0..a.num_registers() {
assert_eq!(a.get_register(i), b.get_register(i));
}
}
#[test]
fn add_hashes_in_sparse_mode_dedupes() {
let mut s = ExaLogLog::new(12);
let hashes: Vec<u64> = (0..50u64).chain(0..50).chain(0..50).map(splitmix64).collect();
s.add_hashes(&hashes);
assert!(s.is_sparse());
let est = s.estimate_ml();
let rel_err = (est - 50.0).abs() / 50.0;
assert!(rel_err < 0.20, "expected ~50, got {est}");
}
#[test]
fn min_p_works() {
let mut s = ExaLogLog::new_dense(MIN_P);
for i in 0..1000u64 {
s.add_hash(splitmix64(i));
}
assert!(s.estimate_ml().is_finite());
}
#[test]
fn max_p_skips_sparse() {
let s = ExaLogLog::new(25);
assert!(!s.is_sparse(), "p=25 should skip sparse mode");
let s = ExaLogLog::new(MAX_P);
assert!(!s.is_sparse(), "p=MAX_P should skip sparse mode");
}
#[test]
fn p_24_still_uses_sparse() {
let s = ExaLogLog::new(MAX_P_SPARSE);
assert!(s.is_sparse(), "p={MAX_P_SPARSE} should still be sparse");
}
#[test]
fn empty_sketch_is_empty() {
let s = ExaLogLog::new(12);
assert!(s.is_empty());
let s = ExaLogLog::new_dense(12);
assert!(s.is_empty());
}
#[test]
fn populated_sketch_is_not_empty() {
let mut s = ExaLogLog::new(12);
s.add_hash(0xDEAD_BEEF);
assert!(!s.is_empty());
s.densify();
assert!(!s.is_empty());
}
#[test]
fn extend_matches_add_hashes() {
let p = 12;
let mut a = ExaLogLog::new_dense(p);
let mut b = ExaLogLog::new_dense(p);
let hashes: Vec<u64> = (0..50_000u64).map(splitmix64).collect();
a.add_hashes(&hashes);
b.extend(hashes.iter().copied());
assert_eq!(a, b);
}
#[test]
fn equality_matches_register_state() {
let p = 10;
let mut a = ExaLogLog::new(p);
let mut b = ExaLogLog::new_dense(p);
for i in 0..100u64 {
let h = splitmix64(i);
a.add_hash(h);
b.add_hash(h);
}
assert!(a.is_sparse());
assert!(!b.is_sparse());
assert_eq!(a, b);
}
#[test]
fn different_p_compare_unequal() {
let a = ExaLogLog::new(10);
let b = ExaLogLog::new(11);
assert_ne!(a, b);
}
#[test]
fn is_send_and_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<ExaLogLog>();
assert_sync::<ExaLogLog>();
}
#[test]
fn memory_is_43_percent_smaller_than_hll_6bit() {
for p in [8u32, 10, 12, 14] {
let m = 1usize << p;
let s = ExaLogLog::new_dense(p);
assert_eq!(s.register_bytes(), m * 7 / 2);
}
}
}