1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
1718//! # HyperLogLog
19//!
20//! `hyperloglog` is a module that contains a modified version
21//! of [redis's implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c)
22//! with some modification based on strong assumption of usage
23//! within datafusion, so that function can
24//! be efficiently implemented.
25//!
26//! Specifically, like Redis's version, this HLL structure uses
27//! 2**14 = 16384 registers, which means the standard error is
28//! 1.04/(16384**0.5) = 0.8125%. Unlike Redis, the register takes
29//! up full [`u8`] size instead of a raw int* and thus saves some
30//! tricky bit shifting techniques used in the original version.
31//! This results in a memory usage increase from 12Kib to 16Kib.
32//! Also only the dense version is adopted, so there's no automatic
33//! conversion, largely to simplify the code.
34//!
35//! This module also borrows some code structure from [pdatastructs.rs](https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs).
3637use ahash::RandomState;
38use std::hash::Hash;
39use std::marker::PhantomData;
4041/// The greater is P, the smaller the error.
42const HLL_P: usize = 14_usize;
43/// The number of bits of the hash value used determining the number of leading zeros
44const HLL_Q: usize = 64_usize - HLL_P;
45const NUM_REGISTERS: usize = 1_usize << HLL_P;
46/// Mask to obtain index into the registers
47const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1;
4849#[derive(Clone, Debug)]
50pub(crate) struct HyperLogLog<T>
51where
52T: Hash + ?Sized,
53{
54 registers: [u8; NUM_REGISTERS],
55 phantom: PhantomData<T>,
56}
5758/// Fixed seed for the hashing so that values are consistent across runs
59///
60/// Note that when we later move on to have serialized HLL register binaries
61/// shared across cluster, this SEED will have to be consistent across all
62/// parties otherwise we might have corruption. So ideally for later this seed
63/// shall be part of the serialized form (or stay unchanged across versions).
64const SEED: RandomState = RandomState::with_seeds(
650x885f6cab121d01a3_u64,
660x71e4379f2976ad8f_u64,
670xbf30173dd28a8816_u64,
680x0eaea5d736d733a4_u64,
69);
7071impl<T> Default for HyperLogLog<T>
72where
73T: Hash + ?Sized,
74{
75fn default() -> Self {
76Self::new()
77 }
78}
7980impl<T> HyperLogLog<T>
81where
82T: Hash + ?Sized,
83{
84/// Creates a new, empty HyperLogLog.
85pub fn new() -> Self {
86let registers = [0; NUM_REGISTERS];
87Self::new_with_registers(registers)
88 }
8990/// Creates a HyperLogLog from already populated registers
91 /// note that this method should not be invoked in untrusted environment
92 /// because the internal structure of registers are not examined.
93pub(crate) fn new_with_registers(registers: [u8; NUM_REGISTERS]) -> Self {
94Self {
95 registers,
96 phantom: PhantomData,
97 }
98 }
99100/// choice of hash function: ahash is already an dependency
101 /// and it fits the requirements of being a 64bit hash with
102 /// reasonable performance.
103#[inline]
104fn hash_value(&self, obj: &T) -> u64 {
105 SEED.hash_one(obj)
106 }
107108/// Adds an element to the HyperLogLog.
109pub fn add(&mut self, obj: &T) {
110let hash = self.hash_value(obj);
111let index = (hash & HLL_P_MASK) as usize;
112let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
113self.registers[index] = self.registers[index].max(p as u8);
114 }
115116/// Get the register histogram (each value in register index into
117 /// the histogram; u32 is enough because we only have 2**14=16384 registers
118#[inline]
119fn get_histogram(&self) -> [u32; HLL_Q + 2] {
120let mut histogram = [0; HLL_Q + 2];
121// hopefully this can be unrolled
122for r in self.registers {
123 histogram[r as usize] += 1;
124 }
125 histogram
126 }
127128/// Merge the other [`HyperLogLog`] into this one
129pub fn merge(&mut self, other: &HyperLogLog<T>) {
130assert!(
131self.registers.len() == other.registers.len(),
132"unexpected got unequal register size, expect {}, got {}",
133self.registers.len(),
134 other.registers.len()
135 );
136for i in 0..self.registers.len() {
137self.registers[i] = self.registers[i].max(other.registers[i]);
138 }
139 }
140141/// Guess the number of unique elements seen by the HyperLogLog.
142pub fn count(&self) -> usize {
143let histogram = self.get_histogram();
144let m = NUM_REGISTERS as f64;
145let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m);
146for i in histogram[1..=HLL_Q].iter().rev() {
147 z += *i as f64;
148 z *= 0.5;
149 }
150 z += m * hll_sigma(histogram[0] as f64 / m);
151 (0.5 / 2_f64.ln() * m * m / z).round() as usize
152 }
153}
154155/// Helper function sigma as defined in
156/// "New cardinality estimation algorithms for HyperLogLog sketches"
157/// Otmar Ertl, arXiv:1702.01284
158#[inline]
159fn hll_sigma(x: f64) -> f64 {
160if x == 1. {
161 f64::INFINITY
162 } else {
163let mut y = 1.0;
164let mut z = x;
165let mut x = x;
166loop {
167 x *= x;
168let z_prime = z;
169 z += x * y;
170 y += y;
171if z_prime == z {
172break;
173 }
174 }
175 z
176 }
177}
178179/// Helper function tau as defined in
180/// "New cardinality estimation algorithms for HyperLogLog sketches"
181/// Otmar Ertl, arXiv:1702.01284
182#[inline]
183fn hll_tau(x: f64) -> f64 {
184if x == 0.0 || x == 1.0 {
1850.0
186} else {
187let mut y = 1.0;
188let mut z = 1.0 - x;
189let mut x = x;
190loop {
191 x = x.sqrt();
192let z_prime = z;
193 y *= 0.5;
194 z -= (1.0 - x).powi(2) * y;
195if z_prime == z {
196break;
197 }
198 }
199 z / 3.0
200}
201}
202203impl<T> AsRef<[u8]> for HyperLogLog<T>
204where
205T: Hash + ?Sized,
206{
207fn as_ref(&self) -> &[u8] {
208&self.registers
209 }
210}
211212impl<T> Extend<T> for HyperLogLog<T>
213where
214T: Hash,
215{
216fn extend<S: IntoIterator<Item = T>>(&mut self, iter: S) {
217for elem in iter {
218self.add(&elem);
219 }
220 }
221}
222223impl<'a, T> Extend<&'a T> for HyperLogLog<T>
224where
225T: 'a + Hash + ?Sized,
226{
227fn extend<S: IntoIterator<Item = &'a T>>(&mut self, iter: S) {
228for elem in iter {
229self.add(elem);
230 }
231 }
232}
233234#[cfg(test)]
235mod tests {
236use super::{HyperLogLog, NUM_REGISTERS};
237238fn compare_with_delta(got: usize, expected: usize) {
239let expected = expected as f64;
240let diff = (got as f64) - expected;
241let diff = diff.abs() / expected;
242// times 6 because we want the tests to be stable
243 // so we allow a rather large margin of error
244 // this is adopted from redis's unit test version as well
245let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0;
246assert!(
247 diff <= margin,
248"{} is not near {} percent of {} which is ({}, {})",
249 got,
250 margin,
251 expected,
252 expected * (1.0 - margin),
253 expected * (1.0 + margin)
254 );
255 }
256257macro_rules! sized_number_test {
258 ($SIZE: expr, $T: tt) => {{
259let mut hll = HyperLogLog::<$T>::new();
260for i in 0..$SIZE {
261 hll.add(&i);
262 }
263 compare_with_delta(hll.count(), $SIZE);
264 }};
265 }
266267macro_rules! typed_large_number_test {
268 ($SIZE: expr) => {{
269sized_number_test!($SIZE, u64);
270sized_number_test!($SIZE, u128);
271sized_number_test!($SIZE, i64);
272sized_number_test!($SIZE, i128);
273 }};
274 }
275276macro_rules! typed_number_test {
277 ($SIZE: expr) => {{
278sized_number_test!($SIZE, u16);
279sized_number_test!($SIZE, u32);
280sized_number_test!($SIZE, i16);
281sized_number_test!($SIZE, i32);
282typed_large_number_test!($SIZE);
283 }};
284 }
285286#[test]
287fn test_empty() {
288let hll = HyperLogLog::<u64>::new();
289assert_eq!(hll.count(), 0);
290 }
291292#[test]
293fn test_one() {
294let mut hll = HyperLogLog::<u64>::new();
295 hll.add(&1);
296assert_eq!(hll.count(), 1);
297 }
298299#[test]
300fn test_number_100() {
301typed_number_test!(100);
302 }
303304#[test]
305fn test_number_1k() {
306typed_number_test!(1_000);
307 }
308309#[test]
310fn test_number_10k() {
311typed_number_test!(10_000);
312 }
313314#[test]
315fn test_number_100k() {
316typed_large_number_test!(100_000);
317 }
318319#[test]
320fn test_number_1m() {
321typed_large_number_test!(1_000_000);
322 }
323324#[test]
325fn test_u8() {
326let mut hll = HyperLogLog::<[u8]>::new();
327for i in 0..1000 {
328let s = i.to_string();
329let b = s.as_bytes();
330 hll.add(b);
331 }
332 compare_with_delta(hll.count(), 1000);
333 }
334335#[test]
336fn test_string() {
337let mut hll = HyperLogLog::<String>::new();
338 hll.extend((0..1000).map(|i| i.to_string()));
339 compare_with_delta(hll.count(), 1000);
340 }
341342#[test]
343fn test_empty_merge() {
344let mut hll = HyperLogLog::<u64>::new();
345 hll.merge(&HyperLogLog::<u64>::new());
346assert_eq!(hll.count(), 0);
347 }
348349#[test]
350fn test_merge_overlapped() {
351let mut hll = HyperLogLog::<String>::new();
352 hll.extend((0..1000).map(|i| i.to_string()));
353354let mut other = HyperLogLog::<String>::new();
355 other.extend((0..1000).map(|i| i.to_string()));
356357 hll.merge(&other);
358 compare_with_delta(hll.count(), 1000);
359 }
360361#[test]
362fn test_repetition() {
363let mut hll = HyperLogLog::<u32>::new();
364for i in 0..1_000_000 {
365 hll.add(&(i % 1000));
366 }
367 compare_with_delta(hll.count(), 1000);
368 }
369}