datafusion_functions_aggregate/
hyperloglog.rs1use std::hash::BuildHasher;
38use std::hash::Hash;
39use std::marker::PhantomData;
40
41const HLL_P: usize = 14_usize;
43const HLL_Q: usize = 64_usize - HLL_P;
45const NUM_REGISTERS: usize = 1_usize << HLL_P;
46const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1;
48
49#[derive(Clone, Debug)]
50pub(crate) struct HyperLogLog<T>
51where
52 T: Hash + ?Sized,
53{
54 registers: [u8; NUM_REGISTERS],
55 phantom: PhantomData<T>,
56}
57
58pub(crate) const HLL_HASH_STATE: foldhash::quality::FixedState =
65 foldhash::quality::FixedState::with_seed(0);
66
67impl<T> Default for HyperLogLog<T>
68where
69 T: Hash + ?Sized,
70{
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl<T> HyperLogLog<T>
77where
78 T: Hash + ?Sized,
79{
80 pub fn new() -> Self {
82 let registers = [0; NUM_REGISTERS];
83 Self::new_with_registers(registers)
84 }
85
86 pub(crate) fn new_with_registers(registers: [u8; NUM_REGISTERS]) -> Self {
90 Self {
91 registers,
92 phantom: PhantomData,
93 }
94 }
95
96 #[inline]
100 fn hash_value(&self, obj: &T) -> u64 {
101 HLL_HASH_STATE.hash_one(obj)
102 }
103
104 pub fn add(&mut self, obj: &T) {
106 let hash = self.hash_value(obj);
107 self.add_hashed(hash);
108 }
109
110 #[inline]
115 pub(crate) fn add_hashed(&mut self, hash: u64) {
116 let index = (hash & HLL_P_MASK) as usize;
117 let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
118 self.registers[index] = self.registers[index].max(p as u8);
119 }
120
121 #[inline]
124 fn get_histogram(&self) -> [u32; HLL_Q + 2] {
125 let mut histogram = [0; HLL_Q + 2];
126 for r in self.registers {
128 histogram[r as usize] += 1;
129 }
130 histogram
131 }
132
133 pub fn merge(&mut self, other: &HyperLogLog<T>) {
135 assert!(
136 self.registers.len() == other.registers.len(),
137 "unexpected got unequal register size, expect {}, got {}",
138 self.registers.len(),
139 other.registers.len()
140 );
141 for i in 0..self.registers.len() {
142 self.registers[i] = self.registers[i].max(other.registers[i]);
143 }
144 }
145
146 pub fn count(&self) -> usize {
148 let histogram = self.get_histogram();
149 let m = NUM_REGISTERS as f64;
150 let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m);
151 for i in histogram[1..=HLL_Q].iter().rev() {
152 z += *i as f64;
153 z *= 0.5;
154 }
155 z += m * hll_sigma(histogram[0] as f64 / m);
156 (0.5 / 2_f64.ln() * m * m / z).round() as usize
157 }
158}
159
160#[inline]
164fn hll_sigma(x: f64) -> f64 {
165 if x == 1. {
166 f64::INFINITY
167 } else {
168 let mut y = 1.0;
169 let mut z = x;
170 let mut x = x;
171 loop {
172 x *= x;
173 let z_prime = z;
174 z += x * y;
175 y += y;
176 if z_prime == z {
177 break;
178 }
179 }
180 z
181 }
182}
183
184#[inline]
188fn hll_tau(x: f64) -> f64 {
189 if x == 0.0 || x == 1.0 {
190 0.0
191 } else {
192 let mut y = 1.0;
193 let mut z = 1.0 - x;
194 let mut x = x;
195 loop {
196 x = x.sqrt();
197 let z_prime = z;
198 y *= 0.5;
199 z -= (1.0 - x).powi(2) * y;
200 if z_prime == z {
201 break;
202 }
203 }
204 z / 3.0
205 }
206}
207
208impl<T> AsRef<[u8]> for HyperLogLog<T>
209where
210 T: Hash + ?Sized,
211{
212 fn as_ref(&self) -> &[u8] {
213 &self.registers
214 }
215}
216
217impl<T> Extend<T> for HyperLogLog<T>
218where
219 T: Hash,
220{
221 fn extend<S: IntoIterator<Item = T>>(&mut self, iter: S) {
222 for elem in iter {
223 self.add(&elem);
224 }
225 }
226}
227
228impl<'a, T> Extend<&'a T> for HyperLogLog<T>
229where
230 T: 'a + Hash + ?Sized,
231{
232 fn extend<S: IntoIterator<Item = &'a T>>(&mut self, iter: S) {
233 for elem in iter {
234 self.add(elem);
235 }
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::{HyperLogLog, NUM_REGISTERS};
242
243 fn compare_with_delta(got: usize, expected: usize) {
244 let expected = expected as f64;
245 let diff = (got as f64) - expected;
246 let diff = diff.abs() / expected;
247 let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0;
251 assert!(
252 diff <= margin,
253 "{} is not near {} percent of {} which is ({}, {})",
254 got,
255 margin,
256 expected,
257 expected * (1.0 - margin),
258 expected * (1.0 + margin)
259 );
260 }
261
262 macro_rules! sized_number_test {
263 ($SIZE: expr, $T: tt) => {{
264 let mut hll = HyperLogLog::<$T>::new();
265 for i in 0..$SIZE {
266 hll.add(&i);
267 }
268 compare_with_delta(hll.count(), $SIZE);
269 }};
270 }
271
272 macro_rules! typed_large_number_test {
273 ($SIZE: expr) => {{
274 sized_number_test!($SIZE, u64);
275 sized_number_test!($SIZE, u128);
276 sized_number_test!($SIZE, i64);
277 sized_number_test!($SIZE, i128);
278 }};
279 }
280
281 macro_rules! typed_number_test {
282 ($SIZE: expr) => {{
283 sized_number_test!($SIZE, u16);
284 sized_number_test!($SIZE, u32);
285 sized_number_test!($SIZE, i16);
286 sized_number_test!($SIZE, i32);
287 typed_large_number_test!($SIZE);
288 }};
289 }
290
291 #[test]
292 fn test_empty() {
293 let hll = HyperLogLog::<u64>::new();
294 assert_eq!(hll.count(), 0);
295 }
296
297 #[test]
298 fn test_one() {
299 let mut hll = HyperLogLog::<u64>::new();
300 hll.add(&1);
301 assert_eq!(hll.count(), 1);
302 }
303
304 #[test]
305 fn test_number_100() {
306 typed_number_test!(100);
307 }
308
309 #[test]
310 fn test_number_1k() {
311 typed_number_test!(1_000);
312 }
313
314 #[test]
315 fn test_number_10k() {
316 typed_number_test!(10_000);
317 }
318
319 #[test]
320 fn test_number_100k() {
321 typed_large_number_test!(100_000);
322 }
323
324 #[test]
325 fn test_number_1m() {
326 typed_large_number_test!(1_000_000);
327 }
328
329 #[test]
330 fn test_u8() {
331 let mut hll = HyperLogLog::<[u8]>::new();
332 for i in 0..1000 {
333 let s = i.to_string();
334 let b = s.as_bytes();
335 hll.add(b);
336 }
337 compare_with_delta(hll.count(), 1000);
338 }
339
340 #[test]
341 fn test_string() {
342 let mut hll = HyperLogLog::<String>::new();
343 hll.extend((0..1000).map(|i| i.to_string()));
344 compare_with_delta(hll.count(), 1000);
345 }
346
347 #[test]
348 fn test_empty_merge() {
349 let mut hll = HyperLogLog::<u64>::new();
350 hll.merge(&HyperLogLog::<u64>::new());
351 assert_eq!(hll.count(), 0);
352 }
353
354 #[test]
355 fn test_merge_overlapped() {
356 let mut hll = HyperLogLog::<String>::new();
357 hll.extend((0..1000).map(|i| i.to_string()));
358
359 let mut other = HyperLogLog::<String>::new();
360 other.extend((0..1000).map(|i| i.to_string()));
361
362 hll.merge(&other);
363 compare_with_delta(hll.count(), 1000);
364 }
365
366 #[test]
367 fn test_repetition() {
368 let mut hll = HyperLogLog::<u32>::new();
369 for i in 0..1_000_000 {
370 hll.add(&(i % 1000));
371 }
372 compare_with_delta(hll.count(), 1000);
373 }
374}