amadeus_streaming/
distinct.rs

1// This file includes source code from https://github.com/jedisct1/rust-hyperloglog/blob/36d73a2c0a324f4122d32febdb19dd4a815147f0/src/hyperloglog/lib.rs under the following BSD 2-Clause "Simplified" License:
2//
3// Copyright (c) 2013-2016, Frank Denis
4// All rights reserved.
5//
6// Redistribution and use in source and binary forms, with or without modification,
7// are permitted provided that the following conditions are met:
8//
9//   Redistributions of source code must retain the above copyright notice, this
10//   list of conditions and the following disclaimer.
11//
12//   Redistributions in binary form must reproduce the above copyright notice, this
13//   list of conditions and the following disclaimer in the documentation and/or
14//   other materials provided with the distribution.
15//
16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27// This file includes source code from https://github.com/codahale/sketchy/blob/09e9ede8ac27e6fd37d5c5f53ac9b7776c37bc19/src/hyperloglog.rs under the following Apache License 2.0:
28//
29// Copyright (c) 2015-2017 Coda Hale
30//
31// Licensed under the Apache License, Version 2.0 (the "License");
32// you may not use this file except in compliance with the License.
33// You may obtain a copy of the License at
34//
35//   http://www.apache.org/licenses/LICENSE-2.0
36//
37// Unless required by applicable law or agreed to in writing, software
38// distributed under the License is distributed on an "AS IS" BASIS,
39// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
40// See the License for the specific language governing permissions and
41// limitations under the License.
42
43// https://github.com/twitter/algebird/blob/5fdb079447271a5fe0f1fba068e5f86591ccde36/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala
44// https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD countApproxDistinct
45// is_x86_feature_detected ?
46
47#[cfg(feature = "protobuf")]
48use crate::proto_util;
49#[cfg(feature = "protobuf")]
50use protobuf::CodedOutputStream;
51
52use serde::{Deserialize, Serialize};
53use std::{
54	cmp::{self, Ordering}, convert::{identity, TryFrom}, fmt, hash::{Hash, Hasher}, marker::PhantomData, ops::{self, Range}
55};
56use twox_hash::XxHash;
57
58#[cfg(feature = "protobuf")]
59use std::convert::TryInto;
60
61use super::{f64_to_u8, u64_to_f64, usize_to_f64};
62use crate::traits::{Intersect, IntersectPlusUnionIsPlus, New, UnionAssign};
63
64mod consts;
65use self::consts::{BIAS_DATA, RAW_ESTIMATE_DATA, TRESHOLD_DATA};
66
67/// Like [`HyperLogLog`] but implements `Ord` and `Eq` by using the estimate of the cardinality.
68#[derive(Serialize, Deserialize)]
69#[serde(bound = "")]
70pub struct HyperLogLogMagnitude<V>(HyperLogLog<V>);
71impl<V: Hash> Ord for HyperLogLogMagnitude<V> {
72	#[inline(always)]
73	fn cmp(&self, other: &Self) -> Ordering {
74		self.0.len().partial_cmp(&other.0.len()).unwrap()
75	}
76}
77impl<V: Hash> PartialOrd for HyperLogLogMagnitude<V> {
78	#[inline(always)]
79	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80		self.0.len().partial_cmp(&other.0.len())
81	}
82}
83impl<V: Hash> PartialEq for HyperLogLogMagnitude<V> {
84	#[inline(always)]
85	fn eq(&self, other: &Self) -> bool {
86		self.0.len().eq(&other.0.len())
87	}
88}
89impl<V: Hash> Eq for HyperLogLogMagnitude<V> {}
90impl<V: Hash> Clone for HyperLogLogMagnitude<V> {
91	fn clone(&self) -> Self {
92		Self(self.0.clone())
93	}
94}
95impl<V: Hash> New for HyperLogLogMagnitude<V> {
96	type Config = f64;
97	fn new(config: &Self::Config) -> Self {
98		Self(New::new(config))
99	}
100}
101impl<V: Hash> Intersect for HyperLogLogMagnitude<V> {
102	fn intersect<'a>(iter: impl Iterator<Item = &'a Self>) -> Option<Self>
103	where
104		Self: Sized + 'a,
105	{
106		Intersect::intersect(iter.map(|x| &x.0)).map(Self)
107	}
108}
109impl<'a, V: Hash> UnionAssign<&'a HyperLogLogMagnitude<V>> for HyperLogLogMagnitude<V> {
110	fn union_assign(&mut self, rhs: &'a Self) {
111		self.0.union_assign(&rhs.0)
112	}
113}
114impl<'a, V: Hash> ops::AddAssign<&'a V> for HyperLogLogMagnitude<V> {
115	fn add_assign(&mut self, rhs: &'a V) {
116		self.0.add_assign(rhs)
117	}
118}
119impl<'a, V: Hash> ops::AddAssign<&'a Self> for HyperLogLogMagnitude<V> {
120	fn add_assign(&mut self, rhs: &'a Self) {
121		self.0.add_assign(&rhs.0)
122	}
123}
124impl<V: Hash> fmt::Debug for HyperLogLogMagnitude<V> {
125	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
126		self.0.fmt(fmt)
127	}
128}
129impl<V> IntersectPlusUnionIsPlus for HyperLogLogMagnitude<V> {
130	const VAL: bool = <HyperLogLog<V> as IntersectPlusUnionIsPlus>::VAL;
131}
132
133/// An implementation of the [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog) data structure with *bias correction*.
134///
135/// See [*HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm*](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) and [*HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm*](https://ai.google/research/pubs/pub40671) for background on HyperLogLog with bias correction.
136#[derive(Serialize, Deserialize)]
137#[serde(bound = "")]
138pub struct HyperLogLog<V: ?Sized> {
139	alpha: f64,
140	zero: usize,
141	sum: f64,
142	p: u8,
143	m: Box<[u8]>,
144	marker: PhantomData<fn(V)>,
145}
146
147impl<V: ?Sized> HyperLogLog<V>
148where
149	V: Hash,
150{
151	/// Create an empty `HyperLogLog` data structure with the specified error tolerance.
152	pub fn new(error_rate: f64) -> Self {
153		assert!(0.0 < error_rate && error_rate < 1.0);
154		let p = f64_to_u8((f64::log2(1.04 / error_rate) * 2.0).ceil());
155		assert!(0 < p && p < 64);
156		let alpha = Self::get_alpha(p);
157		Self {
158			alpha,
159			zero: 1 << p,
160			sum: f64::from(1 << p),
161			p,
162			m: vec![0; 1 << p].into_boxed_slice(),
163			marker: PhantomData,
164		}
165	}
166
167	/// Create an empty `HyperLogLog` data structure, copying the error tolerance from `hll`.
168	pub fn new_from(hll: &Self) -> Self {
169		Self {
170			alpha: hll.alpha,
171			zero: hll.m.len(),
172			sum: usize_to_f64(hll.m.len()),
173			p: hll.p,
174			m: vec![0; hll.m.len()].into_boxed_slice(),
175			marker: PhantomData,
176		}
177	}
178
179	/// "Visit" an element.
180	#[inline]
181	pub fn push(&mut self, value: &V) {
182		let mut hasher = XxHash::default();
183		value.hash(&mut hasher);
184		let x = hasher.finish();
185		let j = x & (self.m.len() as u64 - 1);
186		let w = x >> self.p;
187		let rho = Self::get_rho(w, 64 - self.p);
188		let mjr = &mut self.m[usize::try_from(j).unwrap()];
189		let old = *mjr;
190		let new = cmp::max(old, rho);
191		self.zero -= if old == 0 { 1 } else { 0 };
192
193		// see pow_bithack()
194		self.sum -= f64::from_bits(u64::max_value().wrapping_sub(u64::from(old)) << 54 >> 2)
195			- f64::from_bits(u64::max_value().wrapping_sub(u64::from(new)) << 54 >> 2);
196
197		*mjr = new;
198	}
199
200	/// Retrieve an estimate of the carginality of the stream.
201	pub fn len(&self) -> f64 {
202		let v = self.zero;
203		if v > 0 {
204			let h =
205				usize_to_f64(self.m.len()) * (usize_to_f64(self.m.len()) / usize_to_f64(v)).ln();
206			if h <= Self::get_threshold(self.p - 4) {
207				return h;
208			}
209		}
210		self.ep()
211	}
212
213	/// Returns true if empty.
214	pub fn is_empty(&self) -> bool {
215		self.zero == self.m.len()
216	}
217
218	/// Merge another HyperLogLog data structure into `self`.
219	///
220	/// This is the same as an HLL approximating cardinality of the union of two multisets.
221	pub fn union(&mut self, src: &Self) {
222		assert_eq!(src.alpha, self.alpha);
223		assert_eq!(src.p, self.p);
224		assert_eq!(src.m.len(), self.m.len());
225		#[cfg(all(
226			feature = "packed_simd",
227			any(target_arch = "x86", target_arch = "x86_64")
228		))]
229		{
230			assert_eq!(self.m.len() % u8s::lanes(), 0); // TODO: high error rate can trigger this
231			assert_eq!(u8s::lanes(), f32s::lanes() * 4);
232			assert_eq!(f32s::lanes(), u32s::lanes());
233			assert_eq!(u8sq::lanes(), u32s::lanes());
234			let mut zero = u8s_sad_out::splat(0);
235			let mut sum = f32s::splat(0.0);
236			for i in (0..self.m.len()).step_by(u8s::lanes()) {
237				unsafe {
238					let self_m = u8s::from_slice_unaligned_unchecked(self.m.get_unchecked(i..));
239					let src_m = u8s::from_slice_unaligned_unchecked(src.m.get_unchecked(i..));
240					let res = self_m.max(src_m);
241					res.write_to_slice_unaligned_unchecked(self.m.get_unchecked_mut(i..));
242					let count: u8s = u8s::splat(0) - u8s::from_bits(res.eq(u8s::splat(0)));
243					let count2 = Sad::<u8s>::sad(count, u8s::splat(0));
244					zero += count2;
245					for j in 0..4 {
246						let x = u8sq::from_slice_unaligned_unchecked(
247							self.m.get_unchecked(i + j * u8sq::lanes()..),
248						);
249						let x: u32s = x.cast();
250						let x: f32s = ((u32s::splat(u32::max_value()) - x) << 25 >> 2).into_bits();
251						sum += x;
252					}
253				}
254			}
255			self.zero = usize::try_from(zero.wrapping_sum()).unwrap();
256			self.sum = f64::from(sum.sum());
257			// https://github.com/AdamNiederer/faster/issues/37
258			// (src.m.simd_iter(faster::u8s(0)),self.m.simd_iter_mut(faster::u8s(0))).zip()
259		}
260		#[cfg(not(all(
261			feature = "packed_simd",
262			any(target_arch = "x86", target_arch = "x86_64")
263		)))]
264		{
265			let mut zero = 0;
266			let mut sum = 0.0;
267			for (to, from) in self.m.iter_mut().zip(src.m.iter()) {
268				*to = (*to).max(*from);
269				zero += if *to == 0 { 1 } else { 0 };
270				sum += f64::from_bits(u64::max_value().wrapping_sub(u64::from(*to)) << 54 >> 2);
271			}
272			self.zero = zero;
273			self.sum = sum;
274		}
275	}
276
277	/// Intersect another HyperLogLog data structure into `self`.
278	///
279	/// Note: This is different to an HLL approximating cardinality of the intersection of two multisets.
280	pub fn intersect(&mut self, src: &Self) {
281		assert_eq!(src.alpha, self.alpha);
282		assert_eq!(src.p, self.p);
283		assert_eq!(src.m.len(), self.m.len());
284		#[cfg(all(
285			feature = "packed_simd",
286			any(target_arch = "x86", target_arch = "x86_64")
287		))]
288		{
289			assert_eq!(self.m.len() % u8s::lanes(), 0);
290			assert_eq!(u8s::lanes(), f32s::lanes() * 4);
291			assert_eq!(f32s::lanes(), u32s::lanes());
292			assert_eq!(u8sq::lanes(), u32s::lanes());
293			let mut zero = u8s_sad_out::splat(0);
294			let mut sum = f32s::splat(0.0);
295			for i in (0..self.m.len()).step_by(u8s::lanes()) {
296				unsafe {
297					let self_m = u8s::from_slice_unaligned_unchecked(self.m.get_unchecked(i..));
298					let src_m = u8s::from_slice_unaligned_unchecked(src.m.get_unchecked(i..));
299					let res = self_m.min(src_m);
300					res.write_to_slice_unaligned_unchecked(self.m.get_unchecked_mut(i..));
301					let count: u8s = u8s::splat(0) - u8s::from_bits(res.eq(u8s::splat(0)));
302					let count2 = Sad::<u8s>::sad(count, u8s::splat(0));
303					zero += count2;
304					for j in 0..4 {
305						let x = u8sq::from_slice_unaligned_unchecked(
306							self.m.get_unchecked(i + j * u8sq::lanes()..),
307						);
308						let x: u32s = x.cast();
309						let x: f32s = ((u32s::splat(u32::max_value()) - x) << 25 >> 2).into_bits();
310						sum += x;
311					}
312				}
313			}
314			self.zero = usize::try_from(zero.wrapping_sum()).unwrap();
315			self.sum = f64::from(sum.sum());
316		}
317		#[cfg(not(all(
318			feature = "packed_simd",
319			any(target_arch = "x86", target_arch = "x86_64")
320		)))]
321		{
322			let mut zero = 0;
323			let mut sum = 0.0;
324			for (to, from) in self.m.iter_mut().zip(src.m.iter()) {
325				*to = (*to).min(*from);
326				zero += if *to == 0 { 1 } else { 0 };
327				sum += f64::from_bits(u64::max_value().wrapping_sub(u64::from(*to)) << 54 >> 2);
328			}
329			self.zero = zero;
330			self.sum = sum;
331		}
332	}
333
334	/// Clears the `HyperLogLog` data structure, as if it was new.
335	pub fn clear(&mut self) {
336		self.zero = self.m.len();
337		self.sum = usize_to_f64(self.m.len());
338		self.m.iter_mut().for_each(|x| {
339			*x = 0;
340		});
341	}
342
343	/// Returns a serialized representation of this HLL directly by BigQuery.
344	/// See [Zetasketch from Google](https://github.com/google/zetasketch) for more details on the serialization format.
345	/// See [HLL functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/hll_functions) in BigQuery
346	/// to understand how to store and use the sketches generated by this function.
347	/// Note: this serialization  function doesn't support the sparse format as defined in Zetasketch mostly because
348	/// this HyperLogLogPlusPlus implementation doesn't support it.
349	#[cfg(feature = "protobuf")]
350	pub fn serialize_as_proto(&self) -> Vec<u8> {
351		let mut buf = Vec::new();
352		let mut stream = CodedOutputStream::new(&mut buf);
353
354		// We use the NoTag write methods for consistency with the parsing functions and for
355		// consistency with the variable-length writes where we can't use any convenience function.
356		stream.write_uint32_no_tag(proto_util::TYPE_TAG).unwrap();
357		stream
358			.write_enum_no_tag(proto_util::AGGREGATION_TYPE)
359			.unwrap();
360
361		stream
362			.write_uint32_no_tag(proto_util::NUM_VALUES_TAG)
363			.unwrap();
364		// Should be the number of values inserted into this HLL, but BigQuery doesn't really care and this
365		// implementation doesn't maintain this value.
366		stream.write_int64_no_tag(0).unwrap();
367
368		stream
369			.write_uint32_no_tag(proto_util::ENCODING_VERSION_TAG)
370			.unwrap();
371		stream
372			.write_int32_no_tag(proto_util::HYPERLOGLOG_PLUS_PLUS_ENCODING_VERSION)
373			.unwrap();
374
375		stream
376			.write_uint32_no_tag(proto_util::VALUE_TYPE_TAG)
377			.unwrap();
378		stream
379			.write_enum_no_tag(proto_util::BYTES_OR_UTF8_STRING_TYPE)
380			.unwrap();
381
382		let hll_size = self.serialized_hll_size();
383		stream
384			.write_uint32_no_tag(proto_util::HYPERLOGLOGPLUS_UNIQUE_STATE_TAG)
385			.unwrap();
386		stream
387			.write_uint32_no_tag(hll_size.try_into().unwrap())
388			.unwrap();
389
390		stream
391			.write_uint32_no_tag(proto_util::PRECISION_OR_NUM_BUCKETS_TAG)
392			.unwrap();
393		stream.write_int32_no_tag(i32::from(self.p)).unwrap();
394
395		stream.write_uint32_no_tag(proto_util::DATA_TAG).unwrap();
396		stream
397			.write_uint32_no_tag(self.m.len().try_into().unwrap())
398			.unwrap();
399		stream.write_raw_bytes(&self.m).unwrap();
400
401		stream.flush().unwrap();
402
403		buf
404	}
405
406	#[cfg(feature = "protobuf")]
407	fn serialized_hll_size(&self) -> usize {
408		let mut size = 0_usize;
409
410		size += proto_util::compute_uint32_size_no_tag(proto_util::PRECISION_OR_NUM_BUCKETS_TAG);
411		size += proto_util::compute_int32_size_no_tag(i32::from(self.p));
412
413		let data_length = self.m.len();
414		size += proto_util::compute_uint32_size_no_tag(proto_util::DATA_TAG);
415		size += proto_util::compute_uint32_size_no_tag(data_length.try_into().unwrap());
416		size += data_length;
417
418		size
419	}
420
421	fn get_threshold(p: u8) -> f64 {
422		TRESHOLD_DATA[p as usize]
423	}
424
425	fn get_alpha(p: u8) -> f64 {
426		assert!((4..=16).contains(&p));
427		match p {
428			4 => 0.673,
429			5 => 0.697,
430			6 => 0.709,
431			_ => 0.7213 / (1.0 + 1.079 / u64_to_f64(1_u64 << p)),
432		}
433	}
434
435	fn get_rho(w: u64, max_width: u8) -> u8 {
436		let rho = max_width - (64 - u8::try_from(w.leading_zeros()).unwrap()) + 1;
437		assert!(0 < rho && rho < 65);
438		rho
439	}
440
441	fn estimate_bias(e: f64, p: u8) -> f64 {
442		let bias_vector = BIAS_DATA[(p - 4) as usize];
443		let neighbors = Self::get_nearest_neighbors(e, RAW_ESTIMATE_DATA[(p - 4) as usize]);
444		assert_eq!(neighbors.len(), 6);
445		bias_vector[neighbors].iter().sum::<f64>() / 6.0_f64
446	}
447
448	fn get_nearest_neighbors(e: f64, estimate_vector: &[f64]) -> Range<usize> {
449		let index = estimate_vector
450			.binary_search_by(|a| a.partial_cmp(&e).unwrap_or(Ordering::Equal))
451			.unwrap_or_else(identity);
452
453		let mut min = if index > 6 { index - 6 } else { 0 };
454		let mut max = cmp::min(index + 6, estimate_vector.len());
455
456		while max - min != 6 {
457			let (min_val, max_val) = unsafe {
458				(
459					*estimate_vector.get_unchecked(min),
460					*estimate_vector.get_unchecked(max - 1),
461				)
462			};
463			// assert!(min_val <= e && e <= max_val);
464			if 2.0 * e - min_val > max_val {
465				min += 1;
466			} else {
467				max -= 1;
468			}
469		}
470
471		min..max
472	}
473
474	fn ep(&self) -> f64 {
475		let e = self.alpha * usize_to_f64(self.m.len() * self.m.len()) / self.sum;
476		if e <= usize_to_f64(5 * self.m.len()) {
477			e - Self::estimate_bias(e, self.p)
478		} else {
479			e
480		}
481	}
482}
483
484impl<V: ?Sized> Clone for HyperLogLog<V> {
485	fn clone(&self) -> Self {
486		Self {
487			alpha: self.alpha,
488			zero: self.zero,
489			sum: self.sum,
490			p: self.p,
491			m: self.m.clone(),
492			marker: PhantomData,
493		}
494	}
495}
496impl<V: ?Sized> fmt::Debug for HyperLogLog<V>
497where
498	V: Hash,
499{
500	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
501		fmt.debug_struct("HyperLogLog")
502			.field("len", &self.len())
503			.finish()
504	}
505}
506impl<V: ?Sized> New for HyperLogLog<V>
507where
508	V: Hash,
509{
510	type Config = f64;
511	fn new(config: &Self::Config) -> Self {
512		Self::new(*config)
513	}
514}
515impl<V: ?Sized> Intersect for HyperLogLog<V>
516where
517	V: Hash,
518{
519	fn intersect<'a>(mut iter: impl Iterator<Item = &'a Self>) -> Option<Self>
520	where
521		Self: Sized + 'a,
522	{
523		let mut ret = iter.next()?.clone();
524		iter.for_each(|x| {
525			ret.intersect(x);
526		});
527		Some(ret)
528	}
529}
530impl<'a, V: ?Sized> UnionAssign<&'a HyperLogLog<V>> for HyperLogLog<V>
531where
532	V: Hash,
533{
534	fn union_assign(&mut self, rhs: &'a Self) {
535		self.union(rhs)
536	}
537}
538impl<'a, V: ?Sized> ops::AddAssign<&'a V> for HyperLogLog<V>
539where
540	V: Hash,
541{
542	fn add_assign(&mut self, rhs: &'a V) {
543		self.push(rhs)
544	}
545}
546impl<'a, V: ?Sized> ops::AddAssign<&'a Self> for HyperLogLog<V>
547where
548	V: Hash,
549{
550	fn add_assign(&mut self, rhs: &'a Self) {
551		self.union(rhs)
552	}
553}
554impl<V: ?Sized> IntersectPlusUnionIsPlus for HyperLogLog<V> {
555	const VAL: bool = true;
556}
557
558#[cfg(all(
559	feature = "packed_simd",
560	any(target_arch = "x86", target_arch = "x86_64")
561))]
562mod simd {
563	pub use packed_simd::{self, Cast, FromBits, IntoBits};
564	use std::marker::PhantomData;
565
566	#[cfg(target_feature = "avx512bw")] // TODO
567	mod simd_types {
568		use super::packed_simd;
569		pub type u8s = packed_simd::u8x64;
570		pub type u8s_sad_out = packed_simd::u64x8;
571		pub type f32s = packed_simd::f32x16;
572		pub type u32s = packed_simd::u32x16;
573		pub type u8sq = packed_simd::u8x16;
574	}
575	#[cfg(target_feature = "avx2")]
576	mod simd_types {
577		#![allow(non_camel_case_types)]
578		use super::packed_simd;
579		pub type u8s = packed_simd::u8x32;
580		pub type u8s_sad_out = packed_simd::u64x4;
581		pub type f32s = packed_simd::f32x8;
582		pub type u32s = packed_simd::u32x8;
583		pub type u8sq = packed_simd::u8x8;
584	}
585	#[cfg(all(not(target_feature = "avx2"), target_feature = "sse2"))]
586	mod simd_types {
587		#![allow(non_camel_case_types)]
588		use super::packed_simd;
589		pub type u8s = packed_simd::u8x16;
590		pub type u8s_sad_out = packed_simd::u64x2;
591		pub type f32s = packed_simd::f32x4;
592		pub type u32s = packed_simd::u32x4;
593		pub type u8sq = packed_simd::u8x4;
594	}
595	#[cfg(all(not(target_feature = "avx2"), not(target_feature = "sse2")))]
596	mod simd_types {
597		#![allow(non_camel_case_types)]
598		use super::packed_simd;
599		pub type u8s = packed_simd::u8x8;
600		pub type u8s_sad_out = u64;
601		pub type f32s = packed_simd::f32x2;
602		pub type u32s = packed_simd::u32x2;
603		pub type u8sq = packed_simd::u8x2;
604	}
605	pub use self::simd_types::{f32s, u32s, u8s, u8s_sad_out, u8sq};
606
607	pub struct Sad<X>(PhantomData<fn(X)>);
608	#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
609	mod x86 {
610		#[cfg(target_arch = "x86")]
611		pub use std::arch::x86::*;
612		#[cfg(target_arch = "x86_64")]
613		pub use std::arch::x86_64::*;
614	}
615	// TODO
616	// #[cfg(target_feature = "avx512bw")]
617	// impl Sad<packed_simd::u8x64> {
618	// 	#[inline]
619	// 	#[target_feature(enable = "avx512bw")]
620	// 	pub unsafe fn sad(a: packed_simd::u8x64, b: packed_simd::u8x64) -> packed_simd::u64x8 {
621	// 		use std::mem::transmute;
622	// 		packed_simd::Simd(transmute(x86::_mm512_sad_epu8(transmute(a.0), transmute(b.0))))
623	// 	}
624	// }
625	#[cfg(target_feature = "avx2")]
626	impl Sad<packed_simd::u8x32> {
627		#[inline]
628		#[target_feature(enable = "avx2")]
629		pub unsafe fn sad(a: packed_simd::u8x32, b: packed_simd::u8x32) -> packed_simd::u64x4 {
630			use std::mem::transmute;
631			packed_simd::Simd(transmute(x86::_mm256_sad_epu8(
632				transmute(a.0),
633				transmute(b.0),
634			)))
635		}
636	}
637	#[cfg(target_feature = "sse2")]
638	impl Sad<packed_simd::u8x16> {
639		#[inline]
640		#[target_feature(enable = "sse2")]
641		pub unsafe fn sad(a: packed_simd::u8x16, b: packed_simd::u8x16) -> packed_simd::u64x2 {
642			use std::mem::transmute;
643			packed_simd::Simd(transmute(x86::_mm_sad_epu8(transmute(a.0), transmute(b.0))))
644		}
645	}
646	#[cfg(target_feature = "sse,mmx")]
647	impl Sad<packed_simd::u8x8> {
648		#[inline]
649		#[target_feature(enable = "sse,mmx")]
650		pub unsafe fn sad(a: packed_simd::u8x8, b: packed_simd::u8x8) -> u64 {
651			use std::mem::transmute;
652			transmute(x86::_mm_sad_pu8(transmute(a.0), transmute(b.0)))
653		}
654	}
655	#[cfg(not(target_feature = "sse,mmx"))]
656	impl Sad<packed_simd::u8x8> {
657		#[inline(always)]
658		pub unsafe fn sad(a: packed_simd::u8x8, b: packed_simd::u8x8) -> u64 {
659			assert_eq!(b, packed_simd::u8x8::splat(0));
660			(0..8).map(|i| u64::from(a.extract(i))).sum()
661		}
662	}
663}
664#[cfg(all(
665	feature = "packed_simd",
666	any(target_arch = "x86", target_arch = "x86_64")
667))]
668use simd::{f32s, u32s, u8s, u8s_sad_out, u8sq, Cast, FromBits, IntoBits, Sad};
669
670#[cfg(test)]
671mod test {
672	use super::{super::f64_to_usize, HyperLogLog};
673	use std::f64;
674
675	#[test]
676	fn pow_bithack() {
677		// build the float from x, manipulating it to be the mantissa we want.
678		// no portability issues in theory https://doc.rust-lang.org/stable/std/primitive.f64.html#method.from_bits
679		for x in 0_u8..65 {
680			let a = 2.0_f64.powi(-(i32::from(x)));
681			let b = f64::from_bits(u64::max_value().wrapping_sub(u64::from(x)) << 54 >> 2);
682			let c = f32::from_bits(u32::max_value().wrapping_sub(u32::from(x)) << 25 >> 2);
683			assert_eq!(a, b);
684			assert_eq!(a, f64::from(c));
685		}
686	}
687
688	#[test]
689	fn hyperloglog_test_simple() {
690		let mut hll = HyperLogLog::new(0.00408);
691		let keys = ["test1", "test2", "test3", "test2", "test2", "test2"];
692		for k in &keys {
693			hll.push(k);
694		}
695		assert!((hll.len().round() - 3.0).abs() < f64::EPSILON);
696		assert!(!hll.is_empty());
697		hll.clear();
698		assert!(hll.is_empty());
699		assert!(hll.len() == 0.0);
700	}
701
702	#[test]
703	fn hyperloglog_test_merge() {
704		let mut hll = HyperLogLog::new(0.00408);
705		let keys = ["test1", "test2", "test3", "test2", "test2", "test2"];
706		for k in &keys {
707			hll.push(k);
708		}
709		assert!((hll.len().round() - 3.0).abs() < f64::EPSILON);
710
711		let mut hll2 = HyperLogLog::new_from(&hll);
712		let keys2 = ["test3", "test4", "test4", "test4", "test4", "test1"];
713		for k in &keys2 {
714			hll2.push(k);
715		}
716		assert!((hll2.len().round() - 3.0).abs() < f64::EPSILON);
717
718		hll.union(&hll2);
719		assert!((hll.len().round() - 4.0).abs() < f64::EPSILON);
720	}
721
722	#[test]
723	#[cfg_attr(miri, ignore)]
724	fn push() {
725		let actual = 100_000.0;
726		let p = 0.05;
727		let mut hll = HyperLogLog::new(p);
728		for i in 0..f64_to_usize(actual) {
729			hll.push(&i);
730		}
731
732		// assert_eq!(111013.12482663046, hll.len());
733
734		assert!(hll.len() > (actual - (actual * p * 3.0)));
735		assert!(hll.len() < (actual + (actual * p * 3.0)));
736	}
737
738	#[test]
739	#[cfg(feature = "protobuf")]
740	fn hyperloglog_test_proto() {
741		let p = 0.05;
742		let mut hll = HyperLogLog::new(p);
743		let actual = 10_000;
744
745		for i in 0..actual {
746			hll.push(&format!("test-{}", i * 10 + i));
747		}
748
749		assert!(hll.len() > (f64::from(actual) - (f64::from(actual) * p * 3.0)));
750		assert!(hll.len() < (f64::from(actual) + (f64::from(actual) * p * 3.0)));
751
752		// Check on bigQuery with the following query: SELECT HLL_COUNT.EXTRACT(FROM_HEX("<hex_value>"))
753		let expected_ser = "087010001802200b8207850418092a800405080409040407050a0407060507040303080304070504090f08060604060503050505030904070506060904040608060505040706040905040706070b090304040609060505070406060508060505070604050404040507050409040502090605050a0505060203050605080a0605060505040404050705050304060506070608070304050306060304080604060a0704060904070804050706080704080409040604060607040d040604060304040503060406070606050308050606050409040707050605070305040707050305070a06070905040506060905050504060804030605050505080703060c03040405040a03070405090303030204030703060407040404050608050504060604050505090404060504050603080705040606060405050405040506060404050308040604080a06040607040606040707080405040505040603050607060508050307060706070305050403040304050504030407080506040407040704050605060a06050b0607030603050406050506030504050705040504090606030504040704030704030805030504050706090407070604070405060307060a08050507070605090407030604040404030706040405040804050306040504040506020505040607060604060808060508030605030604050b04080a0904050506060405060806070606080605040606040606060607";
754		assert_eq!(hex::encode(&hll.serialize_as_proto()), expected_ser);
755	}
756}