Skip to main content

datafusion_functions_aggregate/
hyperloglog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! # HyperLogLog
19//!
20//! `hyperloglog` is a module that contains a modified version
21//! of [redis's implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c)
22//! with some modification based on strong assumption of usage
23//! within datafusion, so that function can
24//! be efficiently implemented.
25//!
26//! Specifically, like Redis's version, this HLL structure uses
27//! 2**14 = 16384 registers, which means the standard error is
28//! 1.04/(16384**0.5) = 0.8125%. Unlike Redis, the register takes
29//! up full [`u8`] size instead of a raw int* and thus saves some
30//! tricky bit shifting techniques used in the original version.
31//! This results in a memory usage increase from 12Kib to 16Kib.
32//! Also only the dense version is adopted, so there's no automatic
33//! conversion, largely to simplify the code.
34//!
35//! This module also borrows some code structure from [pdatastructs.rs](https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs).
36
37use std::hash::BuildHasher;
38use std::hash::Hash;
39use std::marker::PhantomData;
40
41/// The greater is P, the smaller the error.
42const HLL_P: usize = 14_usize;
43/// The number of bits of the hash value used determining the number of leading zeros
44const HLL_Q: usize = 64_usize - HLL_P;
45const NUM_REGISTERS: usize = 1_usize << HLL_P;
46/// Mask to obtain index into the registers
47const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1;
48
49#[derive(Clone, Debug)]
50pub(crate) struct HyperLogLog<T>
51where
52    T: Hash + ?Sized,
53{
54    registers: [u8; NUM_REGISTERS],
55    phantom: PhantomData<T>,
56}
57
58/// Fixed seed for the hashing so that values are consistent across runs
59///
60/// Note that when we later move on to have serialized HLL register binaries
61/// shared across cluster, this HLL_HASH_STATE will have to be consistent across all
62/// parties otherwise we might have corruption. So ideally for later this seed
63/// shall be part of the serialized form (or stay unchanged across versions).
64pub(crate) const HLL_HASH_STATE: foldhash::quality::FixedState =
65    foldhash::quality::FixedState::with_seed(0);
66
67impl<T> Default for HyperLogLog<T>
68where
69    T: Hash + ?Sized,
70{
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl<T> HyperLogLog<T>
77where
78    T: Hash + ?Sized,
79{
80    /// Creates a new, empty HyperLogLog.
81    pub fn new() -> Self {
82        let registers = [0; NUM_REGISTERS];
83        Self::new_with_registers(registers)
84    }
85
86    /// Creates a HyperLogLog from already populated registers
87    /// note that this method should not be invoked in untrusted environment
88    /// because the internal structure of registers are not examined.
89    pub(crate) fn new_with_registers(registers: [u8; NUM_REGISTERS]) -> Self {
90        Self {
91            registers,
92            phantom: PhantomData,
93        }
94    }
95
96    /// choice of hash function: foldhash is already an dependency
97    /// and it fits the requirements of being a 64bit hash with
98    /// reasonable performance.
99    #[inline]
100    fn hash_value(&self, obj: &T) -> u64 {
101        HLL_HASH_STATE.hash_one(obj)
102    }
103
104    /// Adds an element to the HyperLogLog.
105    pub fn add(&mut self, obj: &T) {
106        let hash = self.hash_value(obj);
107        self.add_hashed(hash);
108    }
109
110    /// Adds a pre-computed hash value directly to the HyperLogLog.
111    ///
112    /// The hash should be computed using [`HLL_HASH_STATE`], the same hasher used
113    /// by [`Self::add`].
114    #[inline]
115    pub(crate) fn add_hashed(&mut self, hash: u64) {
116        let index = (hash & HLL_P_MASK) as usize;
117        let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
118        self.registers[index] = self.registers[index].max(p as u8);
119    }
120
121    /// Get the register histogram (each value in register index into
122    /// the histogram; u32 is enough because we only have 2**14=16384 registers
123    #[inline]
124    fn get_histogram(&self) -> [u32; HLL_Q + 2] {
125        let mut histogram = [0; HLL_Q + 2];
126        // hopefully this can be unrolled
127        for r in self.registers {
128            histogram[r as usize] += 1;
129        }
130        histogram
131    }
132
133    /// Merge the other [`HyperLogLog`] into this one
134    pub fn merge(&mut self, other: &HyperLogLog<T>) {
135        assert!(
136            self.registers.len() == other.registers.len(),
137            "unexpected got unequal register size, expect {}, got {}",
138            self.registers.len(),
139            other.registers.len()
140        );
141        for i in 0..self.registers.len() {
142            self.registers[i] = self.registers[i].max(other.registers[i]);
143        }
144    }
145
146    /// Guess the number of unique elements seen by the HyperLogLog.
147    pub fn count(&self) -> usize {
148        let histogram = self.get_histogram();
149        let m = NUM_REGISTERS as f64;
150        let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m);
151        for i in histogram[1..=HLL_Q].iter().rev() {
152            z += *i as f64;
153            z *= 0.5;
154        }
155        z += m * hll_sigma(histogram[0] as f64 / m);
156        (0.5 / 2_f64.ln() * m * m / z).round() as usize
157    }
158}
159
160/// Helper function sigma as defined in
161/// "New cardinality estimation algorithms for HyperLogLog sketches"
162/// Otmar Ertl, arXiv:1702.01284
163#[inline]
164fn hll_sigma(x: f64) -> f64 {
165    if x == 1. {
166        f64::INFINITY
167    } else {
168        let mut y = 1.0;
169        let mut z = x;
170        let mut x = x;
171        loop {
172            x *= x;
173            let z_prime = z;
174            z += x * y;
175            y += y;
176            if z_prime == z {
177                break;
178            }
179        }
180        z
181    }
182}
183
184/// Helper function tau as defined in
185/// "New cardinality estimation algorithms for HyperLogLog sketches"
186/// Otmar Ertl, arXiv:1702.01284
187#[inline]
188fn hll_tau(x: f64) -> f64 {
189    if x == 0.0 || x == 1.0 {
190        0.0
191    } else {
192        let mut y = 1.0;
193        let mut z = 1.0 - x;
194        let mut x = x;
195        loop {
196            x = x.sqrt();
197            let z_prime = z;
198            y *= 0.5;
199            z -= (1.0 - x).powi(2) * y;
200            if z_prime == z {
201                break;
202            }
203        }
204        z / 3.0
205    }
206}
207
208impl<T> AsRef<[u8]> for HyperLogLog<T>
209where
210    T: Hash + ?Sized,
211{
212    fn as_ref(&self) -> &[u8] {
213        &self.registers
214    }
215}
216
217impl<T> Extend<T> for HyperLogLog<T>
218where
219    T: Hash,
220{
221    fn extend<S: IntoIterator<Item = T>>(&mut self, iter: S) {
222        for elem in iter {
223            self.add(&elem);
224        }
225    }
226}
227
228impl<'a, T> Extend<&'a T> for HyperLogLog<T>
229where
230    T: 'a + Hash + ?Sized,
231{
232    fn extend<S: IntoIterator<Item = &'a T>>(&mut self, iter: S) {
233        for elem in iter {
234            self.add(elem);
235        }
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::{HyperLogLog, NUM_REGISTERS};
242
243    fn compare_with_delta(got: usize, expected: usize) {
244        let expected = expected as f64;
245        let diff = (got as f64) - expected;
246        let diff = diff.abs() / expected;
247        // times 6 because we want the tests to be stable
248        // so we allow a rather large margin of error
249        // this is adopted from redis's unit test version as well
250        let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0;
251        assert!(
252            diff <= margin,
253            "{} is not near {} percent of {} which is ({}, {})",
254            got,
255            margin,
256            expected,
257            expected * (1.0 - margin),
258            expected * (1.0 + margin)
259        );
260    }
261
262    macro_rules! sized_number_test {
263        ($SIZE: expr, $T: tt) => {{
264            let mut hll = HyperLogLog::<$T>::new();
265            for i in 0..$SIZE {
266                hll.add(&i);
267            }
268            compare_with_delta(hll.count(), $SIZE);
269        }};
270    }
271
272    macro_rules! typed_large_number_test {
273        ($SIZE: expr) => {{
274            sized_number_test!($SIZE, u64);
275            sized_number_test!($SIZE, u128);
276            sized_number_test!($SIZE, i64);
277            sized_number_test!($SIZE, i128);
278        }};
279    }
280
281    macro_rules! typed_number_test {
282        ($SIZE: expr) => {{
283            sized_number_test!($SIZE, u16);
284            sized_number_test!($SIZE, u32);
285            sized_number_test!($SIZE, i16);
286            sized_number_test!($SIZE, i32);
287            typed_large_number_test!($SIZE);
288        }};
289    }
290
291    #[test]
292    fn test_empty() {
293        let hll = HyperLogLog::<u64>::new();
294        assert_eq!(hll.count(), 0);
295    }
296
297    #[test]
298    fn test_one() {
299        let mut hll = HyperLogLog::<u64>::new();
300        hll.add(&1);
301        assert_eq!(hll.count(), 1);
302    }
303
304    #[test]
305    fn test_number_100() {
306        typed_number_test!(100);
307    }
308
309    #[test]
310    fn test_number_1k() {
311        typed_number_test!(1_000);
312    }
313
314    #[test]
315    fn test_number_10k() {
316        typed_number_test!(10_000);
317    }
318
319    #[test]
320    fn test_number_100k() {
321        typed_large_number_test!(100_000);
322    }
323
324    #[test]
325    fn test_number_1m() {
326        typed_large_number_test!(1_000_000);
327    }
328
329    #[test]
330    fn test_u8() {
331        let mut hll = HyperLogLog::<[u8]>::new();
332        for i in 0..1000 {
333            let s = i.to_string();
334            let b = s.as_bytes();
335            hll.add(b);
336        }
337        compare_with_delta(hll.count(), 1000);
338    }
339
340    #[test]
341    fn test_string() {
342        let mut hll = HyperLogLog::<String>::new();
343        hll.extend((0..1000).map(|i| i.to_string()));
344        compare_with_delta(hll.count(), 1000);
345    }
346
347    #[test]
348    fn test_empty_merge() {
349        let mut hll = HyperLogLog::<u64>::new();
350        hll.merge(&HyperLogLog::<u64>::new());
351        assert_eq!(hll.count(), 0);
352    }
353
354    #[test]
355    fn test_merge_overlapped() {
356        let mut hll = HyperLogLog::<String>::new();
357        hll.extend((0..1000).map(|i| i.to_string()));
358
359        let mut other = HyperLogLog::<String>::new();
360        other.extend((0..1000).map(|i| i.to_string()));
361
362        hll.merge(&other);
363        compare_with_delta(hll.count(), 1000);
364    }
365
366    #[test]
367    fn test_repetition() {
368        let mut hll = HyperLogLog::<u32>::new();
369        for i in 0..1_000_000 {
370            hll.add(&(i % 1000));
371        }
372        compare_with_delta(hll.count(), 1000);
373    }
374}