lucisearch 0.8.0

Embeddable, in-process search engine — the SQLite/DuckDB of Elasticsearch
Documentation
//! HyperLogLog++ cardinality estimation for the `cardinality` aggregation.
//!
//! Estimates the number of distinct values in a field using probabilistic
//! counting. Uses xxh3 hashing (already a dependency) and the HLL++
//! bias correction for small cardinalities.
//!
//! See [[feature-aggregations-v010]] and the
//! [HyperLogLog++ paper](https://research.google/pubs/hyperloglog-in-practice-algorithmic-engineering-of-a-state-of-the-art-cardinality-estimation-algorithm/).

use crate::core::DocId;

use super::{AggregationResult, Aggregator, AggregatorFactory, MetricResult};
use crate::segment::reader::SegmentReader;

/// HyperLogLog register array.
#[derive(Clone)]
pub struct HllRegisters {
    registers: Vec<u8>,
    precision: u8,
}

impl HllRegisters {
    pub fn new(precision: u8) -> Self {
        let m = 1usize << precision;
        Self {
            registers: vec![0; m],
            precision,
        }
    }

    /// Add a hashed value to the HLL.
    pub fn add_hash(&mut self, hash: u64) {
        let m = self.registers.len();
        let idx = (hash as usize) & (m - 1);
        // Count leading zeros in the remaining bits (after the index bits)
        let w = hash >> self.precision;
        let rho = if w == 0 {
            (64 - self.precision) as u8
        } else {
            (w.leading_zeros() as u8) + 1 - self.precision
        };
        if rho > self.registers[idx] {
            self.registers[idx] = rho;
        }
    }

    /// Merge another HLL into this one (max per register).
    pub fn merge(&mut self, other: &HllRegisters) {
        for (a, &b) in self.registers.iter_mut().zip(other.registers.iter()) {
            if b > *a {
                *a = b;
            }
        }
    }

    /// Estimate the cardinality.
    pub fn estimate(&self) -> f64 {
        let m = self.registers.len() as f64;
        // Harmonic mean of 2^(-register[i])
        let mut sum = 0.0f64;
        let mut zeros = 0u32;
        for &r in &self.registers {
            sum += 2.0f64.powi(-(r as i32));
            if r == 0 {
                zeros += 1;
            }
        }

        // Alpha constant for bias correction
        let alpha = match self.registers.len() {
            16 => 0.673,
            32 => 0.697,
            64 => 0.709,
            _ => 0.7213 / (1.0 + 1.079 / m),
        };

        let raw_estimate = alpha * m * m / sum;

        // Small range correction (linear counting)
        if raw_estimate <= 2.5 * m && zeros > 0 {
            // Linear counting
            m * (m / zeros as f64).ln()
        } else {
            raw_estimate
        }
    }

    /// Serialize registers to bytes for merge transport.
    pub fn to_bytes(&self) -> Vec<u8> {
        let mut buf = Vec::with_capacity(1 + self.registers.len());
        buf.push(self.precision);
        buf.extend_from_slice(&self.registers);
        buf
    }

    /// Deserialize registers from bytes.
    pub fn from_bytes(data: &[u8]) -> Option<Self> {
        if data.is_empty() {
            return None;
        }
        let precision = data[0];
        let registers = data[1..].to_vec();
        Some(Self {
            registers,
            precision,
        })
    }
}

// --- Cardinality aggregation ---

pub struct CardinalityAggFactory {
    pub field_name: String,
    pub precision: u8,
}

impl AggregatorFactory for CardinalityAggFactory {
    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
        let field_id = reader
            .header()
            .fields
            .iter()
            .find(|f| f.field_name == self.field_name)
            .map(|f| f.field_id);

        let col = super::bucket::OwnedColumn::new(field_id, reader);

        Box::new(CardinalityCollector {
            hll: HllRegisters::new(self.precision),
            col,
        })
    }

    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
        let mut merged = HllRegisters::new(self.precision);
        let mut has_data = false;

        for r in &results {
            if let AggregationResult::Metric(m) = r {
                if let Some(ref bytes) = m.merge_state {
                    if let Some(segment_hll) = HllRegisters::from_bytes(bytes) {
                        merged.merge(&segment_hll);
                        has_data = true;
                    }
                }
            }
        }

        if !has_data {
            AggregationResult::Metric(MetricResult::single(None))
        } else {
            AggregationResult::Metric(MetricResult::single(Some(merged.estimate().round())))
        }
    }
}

struct CardinalityCollector {
    hll: HllRegisters,
    col: Option<super::bucket::OwnedColumn>,
}

unsafe impl Send for CardinalityCollector {}

impl Aggregator for CardinalityCollector {
    fn collect(&mut self, doc_id: DocId) {
        let Some(col) = self.col.as_ref() else { return };
        let hash = if let Some(ord) = col.keyword_ordinal(doc_id.as_u32()) {
            xxhash_rust::xxh3::xxh3_64(&(ord as u64).to_le_bytes())
        } else if let Some(v) = col.numeric_value(doc_id.as_u32()) {
            if v.is_nan() {
                return;
            }
            xxhash_rust::xxh3::xxh3_64(&v.to_le_bytes())
        } else {
            return;
        };
        self.hll.add_hash(hash);
    }

    fn finish(self: Box<Self>) -> AggregationResult {
        let estimate = self.hll.estimate();
        AggregationResult::Metric(MetricResult {
            value: Some(estimate.round()),
            extra: std::collections::HashMap::new(),
            merge_state: Some(self.hll.to_bytes()),
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn hll_empty() {
        let hll = HllRegisters::new(14);
        assert_eq!(hll.estimate(), 0.0);
    }

    #[test]
    fn hll_single_value() {
        let mut hll = HllRegisters::new(14);
        hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"hello"));
        let est = hll.estimate();
        assert!(est >= 0.5 && est <= 2.0, "single value estimate: {est}");
    }

    #[test]
    fn hll_known_cardinality() {
        let mut hll = HllRegisters::new(14);
        for i in 0..10000u64 {
            hll.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
        }
        let est = hll.estimate();
        // HLL with p=14 has ~1% error rate for 10K values
        let error = (est - 10000.0).abs() / 10000.0;
        assert!(
            error < 0.05,
            "10K cardinality estimate: {est}, error: {error:.3}"
        );
    }

    #[test]
    fn hll_merge() {
        let mut hll1 = HllRegisters::new(14);
        let mut hll2 = HllRegisters::new(14);
        for i in 0..5000u64 {
            hll1.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
        }
        for i in 5000..10000u64 {
            hll2.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
        }
        hll1.merge(&hll2);
        let est = hll1.estimate();
        let error = (est - 10000.0).abs() / 10000.0;
        assert!(
            error < 0.05,
            "merged 10K estimate: {est}, error: {error:.3}"
        );
    }

    #[test]
    fn hll_duplicate_values() {
        let mut hll = HllRegisters::new(14);
        for _ in 0..10000 {
            hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"same_value"));
        }
        let est = hll.estimate();
        assert!(est >= 0.5 && est <= 2.0, "duplicate values estimate: {est}");
    }
}