datafusion_functions_aggregate/
hyperloglog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! # HyperLogLog
19//!
20//! `hyperloglog` is a module that contains a modified version
21//! of [redis's implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c)
22//! with some modification based on strong assumption of usage
23//! within datafusion, so that function can
24//! be efficiently implemented.
25//!
26//! Specifically, like Redis's version, this HLL structure uses
27//! 2**14 = 16384 registers, which means the standard error is
28//! 1.04/(16384**0.5) = 0.8125%. Unlike Redis, the register takes
29//! up full [`u8`] size instead of a raw int* and thus saves some
30//! tricky bit shifting techniques used in the original version.
31//! This results in a memory usage increase from 12Kib to 16Kib.
32//! Also only the dense version is adopted, so there's no automatic
33//! conversion, largely to simplify the code.
34//!
35//! This module also borrows some code structure from [pdatastructs.rs](https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs).
36
37use ahash::RandomState;
38use std::hash::Hash;
39use std::marker::PhantomData;
40
41/// The greater is P, the smaller the error.
42const HLL_P: usize = 14_usize;
43/// The number of bits of the hash value used determining the number of leading zeros
44const HLL_Q: usize = 64_usize - HLL_P;
45const NUM_REGISTERS: usize = 1_usize << HLL_P;
46/// Mask to obtain index into the registers
47const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1;
48
49#[derive(Clone, Debug)]
50pub(crate) struct HyperLogLog<T>
51where
52    T: Hash + ?Sized,
53{
54    registers: [u8; NUM_REGISTERS],
55    phantom: PhantomData<T>,
56}
57
58/// Fixed seed for the hashing so that values are consistent across runs
59///
60/// Note that when we later move on to have serialized HLL register binaries
61/// shared across cluster, this SEED will have to be consistent across all
62/// parties otherwise we might have corruption. So ideally for later this seed
63/// shall be part of the serialized form (or stay unchanged across versions).
64const SEED: RandomState = RandomState::with_seeds(
65    0x885f6cab121d01a3_u64,
66    0x71e4379f2976ad8f_u64,
67    0xbf30173dd28a8816_u64,
68    0x0eaea5d736d733a4_u64,
69);
70
71impl<T> Default for HyperLogLog<T>
72where
73    T: Hash + ?Sized,
74{
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl<T> HyperLogLog<T>
81where
82    T: Hash + ?Sized,
83{
84    /// Creates a new, empty HyperLogLog.
85    pub fn new() -> Self {
86        let registers = [0; NUM_REGISTERS];
87        Self::new_with_registers(registers)
88    }
89
90    /// Creates a HyperLogLog from already populated registers
91    /// note that this method should not be invoked in untrusted environment
92    /// because the internal structure of registers are not examined.
93    pub(crate) fn new_with_registers(registers: [u8; NUM_REGISTERS]) -> Self {
94        Self {
95            registers,
96            phantom: PhantomData,
97        }
98    }
99
100    /// choice of hash function: ahash is already an dependency
101    /// and it fits the requirements of being a 64bit hash with
102    /// reasonable performance.
103    #[inline]
104    fn hash_value(&self, obj: &T) -> u64 {
105        SEED.hash_one(obj)
106    }
107
108    /// Adds an element to the HyperLogLog.
109    pub fn add(&mut self, obj: &T) {
110        let hash = self.hash_value(obj);
111        let index = (hash & HLL_P_MASK) as usize;
112        let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
113        self.registers[index] = self.registers[index].max(p as u8);
114    }
115
116    /// Get the register histogram (each value in register index into
117    /// the histogram; u32 is enough because we only have 2**14=16384 registers
118    #[inline]
119    fn get_histogram(&self) -> [u32; HLL_Q + 2] {
120        let mut histogram = [0; HLL_Q + 2];
121        // hopefully this can be unrolled
122        for r in self.registers {
123            histogram[r as usize] += 1;
124        }
125        histogram
126    }
127
128    /// Merge the other [`HyperLogLog`] into this one
129    pub fn merge(&mut self, other: &HyperLogLog<T>) {
130        assert!(
131            self.registers.len() == other.registers.len(),
132            "unexpected got unequal register size, expect {}, got {}",
133            self.registers.len(),
134            other.registers.len()
135        );
136        for i in 0..self.registers.len() {
137            self.registers[i] = self.registers[i].max(other.registers[i]);
138        }
139    }
140
141    /// Guess the number of unique elements seen by the HyperLogLog.
142    pub fn count(&self) -> usize {
143        let histogram = self.get_histogram();
144        let m = NUM_REGISTERS as f64;
145        let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m);
146        for i in histogram[1..=HLL_Q].iter().rev() {
147            z += *i as f64;
148            z *= 0.5;
149        }
150        z += m * hll_sigma(histogram[0] as f64 / m);
151        (0.5 / 2_f64.ln() * m * m / z).round() as usize
152    }
153}
154
155/// Helper function sigma as defined in
156/// "New cardinality estimation algorithms for HyperLogLog sketches"
157/// Otmar Ertl, arXiv:1702.01284
158#[inline]
159fn hll_sigma(x: f64) -> f64 {
160    if x == 1. {
161        f64::INFINITY
162    } else {
163        let mut y = 1.0;
164        let mut z = x;
165        let mut x = x;
166        loop {
167            x *= x;
168            let z_prime = z;
169            z += x * y;
170            y += y;
171            if z_prime == z {
172                break;
173            }
174        }
175        z
176    }
177}
178
179/// Helper function tau as defined in
180/// "New cardinality estimation algorithms for HyperLogLog sketches"
181/// Otmar Ertl, arXiv:1702.01284
182#[inline]
183fn hll_tau(x: f64) -> f64 {
184    if x == 0.0 || x == 1.0 {
185        0.0
186    } else {
187        let mut y = 1.0;
188        let mut z = 1.0 - x;
189        let mut x = x;
190        loop {
191            x = x.sqrt();
192            let z_prime = z;
193            y *= 0.5;
194            z -= (1.0 - x).powi(2) * y;
195            if z_prime == z {
196                break;
197            }
198        }
199        z / 3.0
200    }
201}
202
203impl<T> AsRef<[u8]> for HyperLogLog<T>
204where
205    T: Hash + ?Sized,
206{
207    fn as_ref(&self) -> &[u8] {
208        &self.registers
209    }
210}
211
212impl<T> Extend<T> for HyperLogLog<T>
213where
214    T: Hash,
215{
216    fn extend<S: IntoIterator<Item = T>>(&mut self, iter: S) {
217        for elem in iter {
218            self.add(&elem);
219        }
220    }
221}
222
223impl<'a, T> Extend<&'a T> for HyperLogLog<T>
224where
225    T: 'a + Hash + ?Sized,
226{
227    fn extend<S: IntoIterator<Item = &'a T>>(&mut self, iter: S) {
228        for elem in iter {
229            self.add(elem);
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::{HyperLogLog, NUM_REGISTERS};
237
238    fn compare_with_delta(got: usize, expected: usize) {
239        let expected = expected as f64;
240        let diff = (got as f64) - expected;
241        let diff = diff.abs() / expected;
242        // times 6 because we want the tests to be stable
243        // so we allow a rather large margin of error
244        // this is adopted from redis's unit test version as well
245        let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0;
246        assert!(
247            diff <= margin,
248            "{} is not near {} percent of {} which is ({}, {})",
249            got,
250            margin,
251            expected,
252            expected * (1.0 - margin),
253            expected * (1.0 + margin)
254        );
255    }
256
257    macro_rules! sized_number_test {
258        ($SIZE: expr, $T: tt) => {{
259            let mut hll = HyperLogLog::<$T>::new();
260            for i in 0..$SIZE {
261                hll.add(&i);
262            }
263            compare_with_delta(hll.count(), $SIZE);
264        }};
265    }
266
267    macro_rules! typed_large_number_test {
268        ($SIZE: expr) => {{
269            sized_number_test!($SIZE, u64);
270            sized_number_test!($SIZE, u128);
271            sized_number_test!($SIZE, i64);
272            sized_number_test!($SIZE, i128);
273        }};
274    }
275
276    macro_rules! typed_number_test {
277        ($SIZE: expr) => {{
278            sized_number_test!($SIZE, u16);
279            sized_number_test!($SIZE, u32);
280            sized_number_test!($SIZE, i16);
281            sized_number_test!($SIZE, i32);
282            typed_large_number_test!($SIZE);
283        }};
284    }
285
286    #[test]
287    fn test_empty() {
288        let hll = HyperLogLog::<u64>::new();
289        assert_eq!(hll.count(), 0);
290    }
291
292    #[test]
293    fn test_one() {
294        let mut hll = HyperLogLog::<u64>::new();
295        hll.add(&1);
296        assert_eq!(hll.count(), 1);
297    }
298
299    #[test]
300    fn test_number_100() {
301        typed_number_test!(100);
302    }
303
304    #[test]
305    fn test_number_1k() {
306        typed_number_test!(1_000);
307    }
308
309    #[test]
310    fn test_number_10k() {
311        typed_number_test!(10_000);
312    }
313
314    #[test]
315    fn test_number_100k() {
316        typed_large_number_test!(100_000);
317    }
318
319    #[test]
320    fn test_number_1m() {
321        typed_large_number_test!(1_000_000);
322    }
323
324    #[test]
325    fn test_u8() {
326        let mut hll = HyperLogLog::<[u8]>::new();
327        for i in 0..1000 {
328            let s = i.to_string();
329            let b = s.as_bytes();
330            hll.add(b);
331        }
332        compare_with_delta(hll.count(), 1000);
333    }
334
335    #[test]
336    fn test_string() {
337        let mut hll = HyperLogLog::<String>::new();
338        hll.extend((0..1000).map(|i| i.to_string()));
339        compare_with_delta(hll.count(), 1000);
340    }
341
342    #[test]
343    fn test_empty_merge() {
344        let mut hll = HyperLogLog::<u64>::new();
345        hll.merge(&HyperLogLog::<u64>::new());
346        assert_eq!(hll.count(), 0);
347    }
348
349    #[test]
350    fn test_merge_overlapped() {
351        let mut hll = HyperLogLog::<String>::new();
352        hll.extend((0..1000).map(|i| i.to_string()));
353
354        let mut other = HyperLogLog::<String>::new();
355        other.extend((0..1000).map(|i| i.to_string()));
356
357        hll.merge(&other);
358        compare_with_delta(hll.count(), 1000);
359    }
360
361    #[test]
362    fn test_repetition() {
363        let mut hll = HyperLogLog::<u32>::new();
364        for i in 0..1_000_000 {
365            hll.add(&(i % 1000));
366        }
367        compare_with_delta(hll.count(), 1000);
368    }
369}