datafusion_functions_aggregate/
hyperloglog.rs1use ahash::RandomState;
38use std::hash::Hash;
39use std::marker::PhantomData;
40
41const HLL_P: usize = 14_usize;
43const HLL_Q: usize = 64_usize - HLL_P;
45const NUM_REGISTERS: usize = 1_usize << HLL_P;
46const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1;
48
49#[derive(Clone, Debug)]
50pub(crate) struct HyperLogLog<T>
51where
52 T: Hash + ?Sized,
53{
54 registers: [u8; NUM_REGISTERS],
55 phantom: PhantomData<T>,
56}
57
58const SEED: RandomState = RandomState::with_seeds(
65 0x885f6cab121d01a3_u64,
66 0x71e4379f2976ad8f_u64,
67 0xbf30173dd28a8816_u64,
68 0x0eaea5d736d733a4_u64,
69);
70
71impl<T> Default for HyperLogLog<T>
72where
73 T: Hash + ?Sized,
74{
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl<T> HyperLogLog<T>
81where
82 T: Hash + ?Sized,
83{
84 pub fn new() -> Self {
86 let registers = [0; NUM_REGISTERS];
87 Self::new_with_registers(registers)
88 }
89
90 pub(crate) fn new_with_registers(registers: [u8; NUM_REGISTERS]) -> Self {
94 Self {
95 registers,
96 phantom: PhantomData,
97 }
98 }
99
100 #[inline]
104 fn hash_value(&self, obj: &T) -> u64 {
105 SEED.hash_one(obj)
106 }
107
108 pub fn add(&mut self, obj: &T) {
110 let hash = self.hash_value(obj);
111 let index = (hash & HLL_P_MASK) as usize;
112 let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
113 self.registers[index] = self.registers[index].max(p as u8);
114 }
115
116 #[inline]
119 fn get_histogram(&self) -> [u32; HLL_Q + 2] {
120 let mut histogram = [0; HLL_Q + 2];
121 for r in self.registers {
123 histogram[r as usize] += 1;
124 }
125 histogram
126 }
127
128 pub fn merge(&mut self, other: &HyperLogLog<T>) {
130 assert!(
131 self.registers.len() == other.registers.len(),
132 "unexpected got unequal register size, expect {}, got {}",
133 self.registers.len(),
134 other.registers.len()
135 );
136 for i in 0..self.registers.len() {
137 self.registers[i] = self.registers[i].max(other.registers[i]);
138 }
139 }
140
141 pub fn count(&self) -> usize {
143 let histogram = self.get_histogram();
144 let m = NUM_REGISTERS as f64;
145 let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m);
146 for i in histogram[1..=HLL_Q].iter().rev() {
147 z += *i as f64;
148 z *= 0.5;
149 }
150 z += m * hll_sigma(histogram[0] as f64 / m);
151 (0.5 / 2_f64.ln() * m * m / z).round() as usize
152 }
153}
154
155#[inline]
159fn hll_sigma(x: f64) -> f64 {
160 if x == 1. {
161 f64::INFINITY
162 } else {
163 let mut y = 1.0;
164 let mut z = x;
165 let mut x = x;
166 loop {
167 x *= x;
168 let z_prime = z;
169 z += x * y;
170 y += y;
171 if z_prime == z {
172 break;
173 }
174 }
175 z
176 }
177}
178
179#[inline]
183fn hll_tau(x: f64) -> f64 {
184 if x == 0.0 || x == 1.0 {
185 0.0
186 } else {
187 let mut y = 1.0;
188 let mut z = 1.0 - x;
189 let mut x = x;
190 loop {
191 x = x.sqrt();
192 let z_prime = z;
193 y *= 0.5;
194 z -= (1.0 - x).powi(2) * y;
195 if z_prime == z {
196 break;
197 }
198 }
199 z / 3.0
200 }
201}
202
203impl<T> AsRef<[u8]> for HyperLogLog<T>
204where
205 T: Hash + ?Sized,
206{
207 fn as_ref(&self) -> &[u8] {
208 &self.registers
209 }
210}
211
212impl<T> Extend<T> for HyperLogLog<T>
213where
214 T: Hash,
215{
216 fn extend<S: IntoIterator<Item = T>>(&mut self, iter: S) {
217 for elem in iter {
218 self.add(&elem);
219 }
220 }
221}
222
223impl<'a, T> Extend<&'a T> for HyperLogLog<T>
224where
225 T: 'a + Hash + ?Sized,
226{
227 fn extend<S: IntoIterator<Item = &'a T>>(&mut self, iter: S) {
228 for elem in iter {
229 self.add(elem);
230 }
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::{HyperLogLog, NUM_REGISTERS};
237
238 fn compare_with_delta(got: usize, expected: usize) {
239 let expected = expected as f64;
240 let diff = (got as f64) - expected;
241 let diff = diff.abs() / expected;
242 let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0;
246 assert!(
247 diff <= margin,
248 "{} is not near {} percent of {} which is ({}, {})",
249 got,
250 margin,
251 expected,
252 expected * (1.0 - margin),
253 expected * (1.0 + margin)
254 );
255 }
256
257 macro_rules! sized_number_test {
258 ($SIZE: expr, $T: tt) => {{
259 let mut hll = HyperLogLog::<$T>::new();
260 for i in 0..$SIZE {
261 hll.add(&i);
262 }
263 compare_with_delta(hll.count(), $SIZE);
264 }};
265 }
266
267 macro_rules! typed_large_number_test {
268 ($SIZE: expr) => {{
269 sized_number_test!($SIZE, u64);
270 sized_number_test!($SIZE, u128);
271 sized_number_test!($SIZE, i64);
272 sized_number_test!($SIZE, i128);
273 }};
274 }
275
276 macro_rules! typed_number_test {
277 ($SIZE: expr) => {{
278 sized_number_test!($SIZE, u16);
279 sized_number_test!($SIZE, u32);
280 sized_number_test!($SIZE, i16);
281 sized_number_test!($SIZE, i32);
282 typed_large_number_test!($SIZE);
283 }};
284 }
285
286 #[test]
287 fn test_empty() {
288 let hll = HyperLogLog::<u64>::new();
289 assert_eq!(hll.count(), 0);
290 }
291
292 #[test]
293 fn test_one() {
294 let mut hll = HyperLogLog::<u64>::new();
295 hll.add(&1);
296 assert_eq!(hll.count(), 1);
297 }
298
299 #[test]
300 fn test_number_100() {
301 typed_number_test!(100);
302 }
303
304 #[test]
305 fn test_number_1k() {
306 typed_number_test!(1_000);
307 }
308
309 #[test]
310 fn test_number_10k() {
311 typed_number_test!(10_000);
312 }
313
314 #[test]
315 fn test_number_100k() {
316 typed_large_number_test!(100_000);
317 }
318
319 #[test]
320 fn test_number_1m() {
321 typed_large_number_test!(1_000_000);
322 }
323
324 #[test]
325 fn test_u8() {
326 let mut hll = HyperLogLog::<[u8]>::new();
327 for i in 0..1000 {
328 let s = i.to_string();
329 let b = s.as_bytes();
330 hll.add(b);
331 }
332 compare_with_delta(hll.count(), 1000);
333 }
334
335 #[test]
336 fn test_string() {
337 let mut hll = HyperLogLog::<String>::new();
338 hll.extend((0..1000).map(|i| i.to_string()));
339 compare_with_delta(hll.count(), 1000);
340 }
341
342 #[test]
343 fn test_empty_merge() {
344 let mut hll = HyperLogLog::<u64>::new();
345 hll.merge(&HyperLogLog::<u64>::new());
346 assert_eq!(hll.count(), 0);
347 }
348
349 #[test]
350 fn test_merge_overlapped() {
351 let mut hll = HyperLogLog::<String>::new();
352 hll.extend((0..1000).map(|i| i.to_string()));
353
354 let mut other = HyperLogLog::<String>::new();
355 other.extend((0..1000).map(|i| i.to_string()));
356
357 hll.merge(&other);
358 compare_with_delta(hll.count(), 1000);
359 }
360
361 #[test]
362 fn test_repetition() {
363 let mut hll = HyperLogLog::<u32>::new();
364 for i in 0..1_000_000 {
365 hll.add(&(i % 1000));
366 }
367 compare_with_delta(hll.count(), 1000);
368 }
369}