1use crate::core::DocId;
11
12use super::{AggregationResult, Aggregator, AggregatorFactory, MetricResult};
13use crate::segment::reader::SegmentReader;
14
15#[derive(Clone)]
17pub struct HllRegisters {
18 registers: Vec<u8>,
19 precision: u8,
20}
21
22impl HllRegisters {
23 pub fn new(precision: u8) -> Self {
24 let m = 1usize << precision;
25 Self {
26 registers: vec![0; m],
27 precision,
28 }
29 }
30
31 pub fn add_hash(&mut self, hash: u64) {
33 let m = self.registers.len();
34 let idx = (hash as usize) & (m - 1);
35 let w = hash >> self.precision;
37 let rho = if w == 0 {
38 (64 - self.precision) as u8
39 } else {
40 (w.leading_zeros() as u8) + 1 - self.precision
41 };
42 if rho > self.registers[idx] {
43 self.registers[idx] = rho;
44 }
45 }
46
47 pub fn merge(&mut self, other: &HllRegisters) {
49 for (a, &b) in self.registers.iter_mut().zip(other.registers.iter()) {
50 if b > *a {
51 *a = b;
52 }
53 }
54 }
55
56 pub fn estimate(&self) -> f64 {
58 let m = self.registers.len() as f64;
59 let mut sum = 0.0f64;
61 let mut zeros = 0u32;
62 for &r in &self.registers {
63 sum += 2.0f64.powi(-(r as i32));
64 if r == 0 {
65 zeros += 1;
66 }
67 }
68
69 let alpha = match self.registers.len() {
71 16 => 0.673,
72 32 => 0.697,
73 64 => 0.709,
74 _ => 0.7213 / (1.0 + 1.079 / m),
75 };
76
77 let raw_estimate = alpha * m * m / sum;
78
79 if raw_estimate <= 2.5 * m && zeros > 0 {
81 m * (m / zeros as f64).ln()
83 } else {
84 raw_estimate
85 }
86 }
87
88 pub fn to_bytes(&self) -> Vec<u8> {
90 let mut buf = Vec::with_capacity(1 + self.registers.len());
91 buf.push(self.precision);
92 buf.extend_from_slice(&self.registers);
93 buf
94 }
95
96 pub fn from_bytes(data: &[u8]) -> Option<Self> {
98 if data.is_empty() {
99 return None;
100 }
101 let precision = data[0];
102 let registers = data[1..].to_vec();
103 Some(Self {
104 registers,
105 precision,
106 })
107 }
108}
109
110pub struct CardinalityAggFactory {
113 pub field_name: String,
114 pub precision: u8,
115}
116
117impl AggregatorFactory for CardinalityAggFactory {
118 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
119 let field_id = reader
120 .header()
121 .fields
122 .iter()
123 .find(|f| f.field_name == self.field_name)
124 .map(|f| f.field_id);
125
126 let col = super::bucket::OwnedColumn::new(field_id, reader);
127
128 Box::new(CardinalityCollector {
129 hll: HllRegisters::new(self.precision),
130 col,
131 })
132 }
133
134 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
135 let mut merged = HllRegisters::new(self.precision);
136 let mut has_data = false;
137
138 for r in &results {
139 if let AggregationResult::Metric(m) = r {
140 if let Some(ref bytes) = m.merge_state {
141 if let Some(segment_hll) = HllRegisters::from_bytes(bytes) {
142 merged.merge(&segment_hll);
143 has_data = true;
144 }
145 }
146 }
147 }
148
149 if !has_data {
150 AggregationResult::Metric(MetricResult::single(None))
151 } else {
152 AggregationResult::Metric(MetricResult::single(Some(merged.estimate().round())))
153 }
154 }
155}
156
157struct CardinalityCollector {
158 hll: HllRegisters,
159 col: Option<super::bucket::OwnedColumn>,
160}
161
162unsafe impl Send for CardinalityCollector {}
163
164impl Aggregator for CardinalityCollector {
165 fn collect(&mut self, doc_id: DocId) {
166 let Some(col) = self.col.as_ref() else { return };
167 let hash = if let Some(ord) = col.keyword_ordinal(doc_id.as_u32()) {
168 xxhash_rust::xxh3::xxh3_64(&(ord as u64).to_le_bytes())
169 } else if let Some(v) = col.numeric_value(doc_id.as_u32()) {
170 if v.is_nan() {
171 return;
172 }
173 xxhash_rust::xxh3::xxh3_64(&v.to_le_bytes())
174 } else {
175 return;
176 };
177 self.hll.add_hash(hash);
178 }
179
180 fn finish(self: Box<Self>) -> AggregationResult {
181 let estimate = self.hll.estimate();
182 AggregationResult::Metric(MetricResult {
183 value: Some(estimate.round()),
184 extra: std::collections::HashMap::new(),
185 merge_state: Some(self.hll.to_bytes()),
186 })
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn hll_empty() {
196 let hll = HllRegisters::new(14);
197 assert_eq!(hll.estimate(), 0.0);
198 }
199
200 #[test]
201 fn hll_single_value() {
202 let mut hll = HllRegisters::new(14);
203 hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"hello"));
204 let est = hll.estimate();
205 assert!(est >= 0.5 && est <= 2.0, "single value estimate: {est}");
206 }
207
208 #[test]
209 fn hll_known_cardinality() {
210 let mut hll = HllRegisters::new(14);
211 for i in 0..10000u64 {
212 hll.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
213 }
214 let est = hll.estimate();
215 let error = (est - 10000.0).abs() / 10000.0;
217 assert!(
218 error < 0.05,
219 "10K cardinality estimate: {est}, error: {error:.3}"
220 );
221 }
222
223 #[test]
224 fn hll_merge() {
225 let mut hll1 = HllRegisters::new(14);
226 let mut hll2 = HllRegisters::new(14);
227 for i in 0..5000u64 {
228 hll1.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
229 }
230 for i in 5000..10000u64 {
231 hll2.add_hash(xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes()));
232 }
233 hll1.merge(&hll2);
234 let est = hll1.estimate();
235 let error = (est - 10000.0).abs() / 10000.0;
236 assert!(
237 error < 0.05,
238 "merged 10K estimate: {est}, error: {error:.3}"
239 );
240 }
241
242 #[test]
243 fn hll_duplicate_values() {
244 let mut hll = HllRegisters::new(14);
245 for _ in 0..10000 {
246 hll.add_hash(xxhash_rust::xxh3::xxh3_64(b"same_value"));
247 }
248 let est = hll.estimate();
249 assert!(est >= 0.5 && est <= 2.0, "duplicate values estimate: {est}");
250 }
251}