use crate::core::DocId;
use super::{AggregationResult, Aggregator, AggregatorFactory, MetricResult};
use crate::segment::reader::SegmentReader;
#[derive(Clone)]
pub struct HllRegisters {
registers: Vec<u8>,
precision: u8,
}
impl HllRegisters {
pub fn new(precision: u8) -> Self {
let m = 1usize << precision;
Self {
registers: vec![0; m],
precision,
}
}
pub fn add_hash(&mut self, hash: u64) {
let m = self.registers.len();
let idx = (hash as usize) & (m - 1);
let w = hash >> self.precision;
let rho = if w == 0 {
(64 - self.precision) as u8
} else {
(w.leading_zeros() as u8) + 1 - self.precision
};
if rho > self.registers[idx] {
self.registers[idx] = rho;
}
}
pub fn merge(&mut self, other: &HllRegisters) {
for (a, &b) in self.registers.iter_mut().zip(other.registers.iter()) {
if b > *a {
*a = b;
}
}
}
pub fn estimate(&self) -> f64 {
let m = self.registers.len() as f64;
let mut sum = 0.0f64;
let mut zeros = 0u32;
for &r in &self.registers {
sum += 2.0f64.powi(-(r as i32));
if r == 0 {
zeros += 1;
}
}
let alpha = match self.registers.len() {
16 => 0.673,
32 => 0.697,
64 => 0.709,
_ => 0.7213 / (1.0 + 1.079 / m),
};
let raw_estimate = alpha * m * m / sum;
if raw_estimate <= 2.5 * m && zeros > 0 {
m * (m / zeros as f64).ln()
} else {
raw_estimate
}
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(1 + self.registers.len());
buf.push(self.precision);
buf.extend_from_slice(&self.registers);
buf
}
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.is_empty() {
return None;
}
let precision = data[0];
let registers = data[1..].to_vec();
Some(Self {
registers,
precision,
})
}
}
pub struct CardinalityAggFactory {
pub field_name: String,
pub precision: u8,
}
impl AggregatorFactory for CardinalityAggFactory {
fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
let field_id = reader
.header()
.fields
.iter()
.find(|f| f.field_name == self.field_name)
.map(|f| f.field_id);
let col = super::bucket::OwnedColumn::new(field_id, reader);
Box::new(CardinalityCollector {
hll: HllRegisters::new(self.precision),
col,
})
}
fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
let mut merged = HllRegisters::new(self.precision);
let mut has_data = false;
for r in &results {
if let AggregationResult::Metric(m) = r {
if let Some(ref bytes) = m.merge_state {
if let Some(segment_hll) = HllRegisters::from_bytes(bytes) {
merged.merge(&segment_hll);
has_data = true;
}
}
}
}
if !has_data {
AggregationResult::Metric(MetricResult::single(None))
} else {
AggregationResult::Metric(MetricResult::single(Some(merged.estimate().round())))
}
}
}
struct CardinalityCollector {
hll: HllRegisters,
col: Option<super::bucket::OwnedColumn>,
}
unsafe impl Send for CardinalityCollector {}
impl Aggregator for CardinalityCollector {
fn collect(&mut self, doc_id: DocId) {
let Some(col) = self.col.as_ref() else { return };
let hash = if let Some(ord) = col.keyword_ordinal(doc_id.as_u32()) {
xxhash_rust::xxh3::xxh3_64(&(ord as u64).to_le_bytes())
} else if let Some(v) = col.numeric_value(doc_id.as_u32()) {
if v.is_nan() {
return;
}
xxhash_rust::xxh3::xxh3_64(&v.to_le_bytes())
} else {
return;
};
self.hll.add_hash(hash);
}
fn finish(self: Box<Self>) -> AggregationResult {
let estimate = self.hll.estimate();
AggregationResult::Metric(MetricResult {
value: Some(estimate.round()),
extra: std::collections::HashMap::new(),
merge_state: Some(self.hll.to_bytes()),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hll_empty() {
let hll = HllRegisters::new(14);
assert_eq!(hll.estimate(), 0.0);
}
#[test]
fn hll_single_value() {
let mut hll = HllRegisters::new(14);
hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"hello"));
let est = hll.estimate();
assert!(est >= 0.5 && est <= 2.0, "single value estimate: {est}");
}
#[test]
fn hll_known_cardinality() {
let mut hll = HllRegisters::new(14);
for i in 0..10000u64 {
hll.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
}
let est = hll.estimate();
let error = (est - 10000.0).abs() / 10000.0;
assert!(
error < 0.05,
"10K cardinality estimate: {est}, error: {error:.3}"
);
}
#[test]
fn hll_merge() {
let mut hll1 = HllRegisters::new(14);
let mut hll2 = HllRegisters::new(14);
for i in 0..5000u64 {
hll1.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
}
for i in 5000..10000u64 {
hll2.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
}
hll1.merge(&hll2);
let est = hll1.estimate();
let error = (est - 10000.0).abs() / 10000.0;
assert!(
error < 0.05,
"merged 10K estimate: {est}, error: {error:.3}"
);
}
#[test]
fn hll_duplicate_values() {
let mut hll = HllRegisters::new(14);
for _ in 0..10000 {
hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"same_value"));
}
let est = hll.estimate();
assert!(est >= 0.5 && est <= 2.0, "duplicate values estimate: {est}");
}
}