Skip to main content

luci/agg/
hll.rs

1//! HyperLogLog++ cardinality estimation for the `cardinality` aggregation.
2//!
3//! Estimates the number of distinct values in a field using probabilistic
4//! counting. Uses xxh3 hashing (already a dependency) and the HLL++
5//! bias correction for small cardinalities.
6//!
7//! See [[feature-aggregations-v010]] and the
8//! [HyperLogLog++ paper](https://research.google/pubs/hyperloglog-in-practice-algorithmic-engineering-of-a-state-of-the-art-cardinality-estimation-algorithm/).
9
10use crate::core::DocId;
11
12use super::{AggregationResult, Aggregator, AggregatorFactory, MetricResult};
13use crate::segment::reader::SegmentReader;
14
15/// HyperLogLog register array.
16#[derive(Clone)]
17pub struct HllRegisters {
18    registers: Vec<u8>,
19    precision: u8,
20}
21
22impl HllRegisters {
23    pub fn new(precision: u8) -> Self {
24        let m = 1usize << precision;
25        Self {
26            registers: vec![0; m],
27            precision,
28        }
29    }
30
31    /// Add a hashed value to the HLL.
32    pub fn add_hash(&mut self, hash: u64) {
33        let m = self.registers.len();
34        let idx = (hash as usize) & (m - 1);
35        // Count leading zeros in the remaining bits (after the index bits)
36        let w = hash >> self.precision;
37        let rho = if w == 0 {
38            (64 - self.precision) as u8
39        } else {
40            (w.leading_zeros() as u8) + 1 - self.precision
41        };
42        if rho > self.registers[idx] {
43            self.registers[idx] = rho;
44        }
45    }
46
47    /// Merge another HLL into this one (max per register).
48    pub fn merge(&mut self, other: &HllRegisters) {
49        for (a, &b) in self.registers.iter_mut().zip(other.registers.iter()) {
50            if b > *a {
51                *a = b;
52            }
53        }
54    }
55
56    /// Estimate the cardinality.
57    pub fn estimate(&self) -> f64 {
58        let m = self.registers.len() as f64;
59        // Harmonic mean of 2^(-register[i])
60        let mut sum = 0.0f64;
61        let mut zeros = 0u32;
62        for &r in &self.registers {
63            sum += 2.0f64.powi(-(r as i32));
64            if r == 0 {
65                zeros += 1;
66            }
67        }
68
69        // Alpha constant for bias correction
70        let alpha = match self.registers.len() {
71            16 => 0.673,
72            32 => 0.697,
73            64 => 0.709,
74            _ => 0.7213 / (1.0 + 1.079 / m),
75        };
76
77        let raw_estimate = alpha * m * m / sum;
78
79        // Small range correction (linear counting)
80        if raw_estimate <= 2.5 * m && zeros > 0 {
81            // Linear counting
82            m * (m / zeros as f64).ln()
83        } else {
84            raw_estimate
85        }
86    }
87
88    /// Serialize registers to bytes for merge transport.
89    pub fn to_bytes(&self) -> Vec<u8> {
90        let mut buf = Vec::with_capacity(1 + self.registers.len());
91        buf.push(self.precision);
92        buf.extend_from_slice(&self.registers);
93        buf
94    }
95
96    /// Deserialize registers from bytes.
97    pub fn from_bytes(data: &[u8]) -> Option<Self> {
98        if data.is_empty() {
99            return None;
100        }
101        let precision = data[0];
102        let registers = data[1..].to_vec();
103        Some(Self {
104            registers,
105            precision,
106        })
107    }
108}
109
110// --- Cardinality aggregation ---
111
112pub struct CardinalityAggFactory {
113    pub field_name: String,
114    pub precision: u8,
115}
116
117impl AggregatorFactory for CardinalityAggFactory {
118    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
119        let field_id = reader
120            .header()
121            .fields
122            .iter()
123            .find(|f| f.field_name == self.field_name)
124            .map(|f| f.field_id);
125
126        let col = super::bucket::OwnedColumn::new(field_id, reader);
127
128        Box::new(CardinalityCollector {
129            hll: HllRegisters::new(self.precision),
130            col,
131        })
132    }
133
134    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
135        let mut merged = HllRegisters::new(self.precision);
136        let mut has_data = false;
137
138        for r in &results {
139            if let AggregationResult::Metric(m) = r {
140                if let Some(ref bytes) = m.merge_state {
141                    if let Some(segment_hll) = HllRegisters::from_bytes(bytes) {
142                        merged.merge(&segment_hll);
143                        has_data = true;
144                    }
145                }
146            }
147        }
148
149        if !has_data {
150            AggregationResult::Metric(MetricResult::single(None))
151        } else {
152            AggregationResult::Metric(MetricResult::single(Some(merged.estimate().round())))
153        }
154    }
155}
156
157struct CardinalityCollector {
158    hll: HllRegisters,
159    col: Option<super::bucket::OwnedColumn>,
160}
161
162unsafe impl Send for CardinalityCollector {}
163
164impl Aggregator for CardinalityCollector {
165    fn collect(&mut self, doc_id: DocId) {
166        let Some(col) = self.col.as_ref() else { return };
167        let hash = if let Some(ord) = col.keyword_ordinal(doc_id.as_u32()) {
168            xxhash_rust::xxh3::xxh3_64(&(ord as u64).to_le_bytes())
169        } else if let Some(v) = col.numeric_value(doc_id.as_u32()) {
170            if v.is_nan() {
171                return;
172            }
173            xxhash_rust::xxh3::xxh3_64(&v.to_le_bytes())
174        } else {
175            return;
176        };
177        self.hll.add_hash(hash);
178    }
179
180    fn finish(self: Box<Self>) -> AggregationResult {
181        let estimate = self.hll.estimate();
182        AggregationResult::Metric(MetricResult {
183            value: Some(estimate.round()),
184            extra: std::collections::HashMap::new(),
185            merge_state: Some(self.hll.to_bytes()),
186        })
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn hll_empty() {
196        let hll = HllRegisters::new(14);
197        assert_eq!(hll.estimate(), 0.0);
198    }
199
200    #[test]
201    fn hll_single_value() {
202        let mut hll = HllRegisters::new(14);
203        hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"hello"));
204        let est = hll.estimate();
205        assert!(est >= 0.5 && est <= 2.0, "single value estimate: {est}");
206    }
207
208    #[test]
209    fn hll_known_cardinality() {
210        let mut hll = HllRegisters::new(14);
211        for i in 0..10000u64 {
212            hll.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
213        }
214        let est = hll.estimate();
215        // HLL with p=14 has ~1% error rate for 10K values
216        let error = (est - 10000.0).abs() / 10000.0;
217        assert!(
218            error < 0.05,
219            "10K cardinality estimate: {est}, error: {error:.3}"
220        );
221    }
222
223    #[test]
224    fn hll_merge() {
225        let mut hll1 = HllRegisters::new(14);
226        let mut hll2 = HllRegisters::new(14);
227        for i in 0..5000u64 {
228            hll1.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
229        }
230        for i in 5000..10000u64 {
231            hll2.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
232        }
233        hll1.merge(&hll2);
234        let est = hll1.estimate();
235        let error = (est - 10000.0).abs() / 10000.0;
236        assert!(
237            error < 0.05,
238            "merged 10K estimate: {est}, error: {error:.3}"
239        );
240    }
241
242    #[test]
243    fn hll_duplicate_values() {
244        let mut hll = HllRegisters::new(14);
245        for _ in 0..10000 {
246            hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"same_value"));
247        }
248        let est = hll.estimate();
249        assert!(est >= 0.5 && est <= 2.0, "duplicate values estimate: {est}");
250    }
251}