use std::convert::TryInto;
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};
use ahash::RandomState;
use arrow::bitmap::utils::get_bit_unchecked;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use polars_arrow::utils::CustomIterTools;
use polars_utils::HashSingle;
use rayon::prelude::*;
use xxhash_rust::xxh3::xxh3_64_with_seed;
use crate::datatypes::UInt64Chunked;
use crate::prelude::*;
use crate::utils::arrow::array::Array;
use crate::POOL;
pub trait VecHash {
fn vec_hash(&self, _random_state: RandomState, _buf: &mut Vec<u64>) {
unimplemented!()
}
fn vec_hash_combine(&self, _random_state: RandomState, _hashes: &mut [u64]) {
unimplemented!()
}
}
pub(crate) fn get_null_hash_value(random_state: RandomState) -> u64 {
let mut hasher = random_state.build_hasher();
3188347919usize.hash(&mut hasher);
let first = hasher.finish();
let mut hasher = random_state.build_hasher();
first.hash(&mut hasher);
hasher.finish()
}
macro_rules! fx_hash_8_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u8>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_16_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u16>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_32_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u32>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_64_bit {
($val: expr, $k: expr ) => {{
($val as u64).wrapping_mul($k)
}};
}
const FXHASH_K: u64 = 0x517cc1b727220a95;
fn finish_vec_hash<T>(ca: &ChunkedArray<T>, random_state: RandomState, buf: &mut Vec<u64>)
where
T: PolarsIntegerType,
T::Native: Hash,
{
let null_h = get_null_hash_value(random_state);
let hashes = buf.as_mut_slice();
let mut offset = 0;
ca.downcast_iter().for_each(|arr| {
if arr.null_count() > 0 {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.for_each(|(valid, h)| {
*h = [null_h, *h][valid as usize];
})
}
offset += arr.len();
});
}
fn integer_vec_hash_combine<T>(ca: &ChunkedArray<T>, random_state: RandomState, hashes: &mut [u64])
where
T: PolarsIntegerType,
T::Native: Hash,
{
let null_h = get_null_hash_value(random_state.clone());
let mut offset = 0;
ca.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values()
.as_slice()
.iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = random_state.hash_single(v);
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.zip(arr.values().as_slice())
.for_each(|((valid, h), l)| {
*h = _boost_hash_combine(
[null_h, random_state.hash_single(l)][valid as usize],
*h,
)
});
}
}
offset += arr.len();
});
}
macro_rules! vec_hash_int {
($ca:ident, $fx_hash:ident) => {
impl VecHash for $ca {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
buf.reserve(self.len());
let k = random_state.hash_one(FXHASH_K);
#[allow(unused_unsafe)]
#[allow(clippy::useless_transmute)]
self.downcast_iter().for_each(|arr| {
buf.extend(
arr.values()
.as_slice()
.iter()
.copied()
.map(|v| unsafe { $fx_hash!(v, k) }),
);
});
finish_vec_hash(self, random_state, buf)
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
integer_vec_hash_combine(self, random_state, hashes)
}
}
};
}
vec_hash_int!(Int64Chunked, fx_hash_64_bit);
vec_hash_int!(Int32Chunked, fx_hash_32_bit);
vec_hash_int!(Int16Chunked, fx_hash_16_bit);
vec_hash_int!(Int8Chunked, fx_hash_8_bit);
vec_hash_int!(UInt64Chunked, fx_hash_64_bit);
vec_hash_int!(UInt32Chunked, fx_hash_32_bit);
vec_hash_int!(UInt16Chunked, fx_hash_16_bit);
vec_hash_int!(UInt8Chunked, fx_hash_8_bit);
pub trait VecHashSingle {
fn get_k(random_state: RandomState) -> u64 {
random_state.hash_one(FXHASH_K)
}
fn _vec_hash_single(self, k: u64) -> u64;
}
impl VecHashSingle for i8 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
unsafe { fx_hash_8_bit!(self, k) }
}
}
impl VecHashSingle for u8 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
#[allow(clippy::useless_transmute)]
unsafe {
fx_hash_8_bit!(self, k)
}
}
}
impl VecHashSingle for i16 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
unsafe { fx_hash_16_bit!(self, k) }
}
}
impl VecHashSingle for u16 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
#[allow(clippy::useless_transmute)]
unsafe {
fx_hash_16_bit!(self, k)
}
}
}
impl VecHashSingle for i32 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
unsafe { fx_hash_32_bit!(self, k) }
}
}
impl VecHashSingle for u32 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
#[allow(clippy::useless_transmute)]
unsafe {
fx_hash_32_bit!(self, k)
}
}
}
impl VecHashSingle for i64 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
fx_hash_64_bit!(self, k)
}
}
impl VecHashSingle for u64 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
fx_hash_64_bit!(self, k)
}
}
impl VecHash for Utf8Chunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
buf.reserve(self.len());
let null_h = get_null_hash_value(random_state);
self.downcast_iter().for_each(|arr| {
if arr.null_count() == 0 {
buf.extend(
arr.values_iter()
.map(|v| xxh3_64_with_seed(v.as_bytes(), null_h)),
)
} else {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => xxh3_64_with_seed(v.as_bytes(), null_h),
None => null_h,
}))
}
});
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
let null_h = get_null_hash_value(random_state);
self.apply_to_slice(
|opt_v, h| {
let l = match opt_v {
Some(v) => xxh3_64_with_seed(v.as_bytes(), null_h),
None => null_h,
};
_boost_hash_combine(l, *h)
},
hashes,
)
}
}
#[cfg(feature = "dtype-binary")]
impl VecHash for BinaryChunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
buf.reserve(self.len());
let null_h = get_null_hash_value(random_state);
self.downcast_iter().for_each(|arr| {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => xxh3_64_with_seed(v, null_h),
None => null_h,
}))
});
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
let null_h = get_null_hash_value(random_state);
self.apply_to_slice(
|opt_v, h| {
let l = match opt_v {
Some(v) => xxh3_64_with_seed(v, null_h),
None => null_h,
};
_boost_hash_combine(l, *h)
},
hashes,
)
}
}
impl VecHash for BooleanChunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
buf.reserve(self.len());
self.downcast_iter().for_each(|arr| {
buf.extend(arr.into_iter().map(|opt_v| random_state.hash_single(opt_v)))
});
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
self.apply_to_slice(
|opt_v, h| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
_boost_hash_combine(hasher.finish(), *h)
},
hashes,
)
}
}
impl VecHash for Float32Chunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
self.bit_repr_small().vec_hash(random_state, buf)
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
self.bit_repr_small().vec_hash_combine(random_state, hashes)
}
}
impl VecHash for Float64Chunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
self.bit_repr_large().vec_hash(random_state, buf)
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
self.bit_repr_large().vec_hash_combine(random_state, hashes)
}
}
#[cfg(feature = "object")]
impl<T> VecHash for ObjectChunked<T>
where
T: PolarsObject,
{
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
buf.reserve(self.len());
self.downcast_iter().for_each(|arr| {
buf.extend(arr.into_iter().map(|opt_v| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
}))
});
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
self.apply_to_slice(
|opt_v, h| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
_boost_hash_combine(hasher.finish(), *h)
},
hashes,
)
}
}
pub(crate) trait AsU64 {
#[allow(clippy::wrong_self_convention)]
fn as_u64(self) -> u64;
}
#[cfg(feature = "performant")]
impl AsU64 for u8 {
#[inline]
fn as_u64(self) -> u64 {
self as u64
}
}
#[cfg(feature = "performant")]
impl AsU64 for u16 {
#[inline]
fn as_u64(self) -> u64 {
self as u64
}
}
impl AsU64 for u32 {
#[inline]
fn as_u64(self) -> u64 {
self as u64
}
}
impl AsU64 for u64 {
#[inline]
fn as_u64(self) -> u64 {
self
}
}
impl AsU64 for i32 {
#[inline]
fn as_u64(self) -> u64 {
let asu32: u32 = unsafe { std::mem::transmute(self) };
asu32 as u64
}
}
impl AsU64 for i64 {
#[inline]
fn as_u64(self) -> u64 {
unsafe { std::mem::transmute(self) }
}
}
impl AsU64 for Option<u32> {
#[inline]
fn as_u64(self) -> u64 {
match self {
Some(v) => v as u64,
None => u64::MAX >> 2,
}
}
}
#[cfg(feature = "performant")]
impl AsU64 for Option<u8> {
#[inline]
fn as_u64(self) -> u64 {
match self {
Some(v) => v as u64,
None => u64::MAX >> 2,
}
}
}
#[cfg(feature = "performant")]
impl AsU64 for Option<u16> {
#[inline]
fn as_u64(self) -> u64 {
match self {
Some(v) => v as u64,
None => u64::MAX >> 2,
}
}
}
impl AsU64 for Option<u64> {
#[inline]
fn as_u64(self) -> u64 {
self.unwrap_or(u64::MAX >> 2)
}
}
impl AsU64 for [u8; 9] {
#[inline]
fn as_u64(self) -> u64 {
u64::from_ne_bytes(self[..8].try_into().unwrap())
}
}
const BUILD_HASHER: RandomState = RandomState::with_seeds(0, 0, 0, 0);
impl AsU64 for [u8; 17] {
#[inline]
fn as_u64(self) -> u64 {
BUILD_HASHER.hash_single(self)
}
}
impl AsU64 for [u8; 13] {
#[inline]
fn as_u64(self) -> u64 {
BUILD_HASHER.hash_single(self)
}
}
#[derive(Default)]
pub struct IdHasher {
hash: u64,
}
impl Hasher for IdHasher {
fn finish(&self) -> u64 {
self.hash
}
fn write(&mut self, _bytes: &[u8]) {
unreachable!("IdHasher should only be used for integer keys <= 64 bit precision")
}
fn write_u32(&mut self, i: u32) {
self.write_u64(i as u64)
}
#[inline]
fn write_u64(&mut self, i: u64) {
self.hash = i;
}
fn write_i32(&mut self, i: i32) {
unsafe { self.write_u32(std::mem::transmute::<i32, u32>(i)) }
}
fn write_i64(&mut self, i: i64) {
unsafe { self.write_u64(std::mem::transmute::<i64, u64>(i)) }
}
}
pub type IdBuildHasher = BuildHasherDefault<IdHasher>;
#[derive(Debug)]
pub(crate) struct IdxHash {
pub(crate) idx: IdxSize,
pub(crate) hash: u64,
}
impl Hash for IdxHash {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash)
}
}
impl IdxHash {
#[inline]
pub(crate) fn new(idx: IdxSize, hash: u64) -> Self {
IdxHash { idx, hash }
}
}
#[derive(Eq, Copy, Clone, Debug)]
pub(crate) struct BytesHash<'a> {
payload: Option<&'a [u8]>,
hash: u64,
}
impl<'a> Hash for BytesHash<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash)
}
}
impl<'a> BytesHash<'a> {
#[inline]
pub(crate) fn new(s: Option<&'a [u8]>, hash: u64) -> Self {
Self { payload: s, hash }
}
#[inline]
pub(crate) fn new_from_str(s: Option<&'a str>, hash: u64) -> Self {
Self {
payload: s.map(|s| s.as_bytes()),
hash,
}
}
}
impl<'a> PartialEq for BytesHash<'a> {
#[inline]
fn eq(&self, other: &Self) -> bool {
(self.hash == other.hash) && (self.payload == other.payload)
}
}
impl<'a> AsU64 for BytesHash<'a> {
fn as_u64(self) -> u64 {
self.hash
}
}
#[inline]
pub(crate) fn this_partition(h: u64, thread_no: u64, n_partitions: u64) -> bool {
debug_assert!(n_partitions.is_power_of_two());
(h.wrapping_add(thread_no)) & n_partitions.wrapping_sub(1) == 0
}
pub(crate) fn prepare_hashed_relation_threaded<T, I>(
iters: Vec<I>,
) -> Vec<HashMap<T, (bool, Vec<IdxSize>), RandomState>>
where
I: Iterator<Item = T> + Send + TrustedLen,
T: Send + Hash + Eq + Sync + Copy,
{
let n_partitions = iters.len();
assert!(n_partitions.is_power_of_two());
let (hashes_and_keys, build_hasher) = create_hash_and_keys_threaded_vectorized(iters, None);
POOL.install(|| {
(0..n_partitions).into_par_iter().map(|partition_no| {
let build_hasher = build_hasher.clone();
let hashes_and_keys = &hashes_and_keys;
let partition_no = partition_no as u64;
let mut hash_tbl: HashMap<T, (bool, Vec<IdxSize>), RandomState> =
HashMap::with_hasher(build_hasher);
let n_threads = n_partitions as u64;
let mut offset = 0;
for hashes_and_keys in hashes_and_keys {
let len = hashes_and_keys.len();
hashes_and_keys
.iter()
.enumerate()
.for_each(|(idx, (h, k))| {
let idx = idx as IdxSize;
if this_partition(*h, partition_no, n_threads) {
let idx = idx + offset;
let entry = hash_tbl
.raw_entry_mut()
.from_key_hashed_nocheck(*h, k);
match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(*h, *k, (false, vec![idx]));
}
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
v.1.push(idx);
}
}
}
});
offset += len as IdxSize;
}
hash_tbl
})
})
.collect()
}
pub(crate) fn create_hash_and_keys_threaded_vectorized<I, T>(
iters: Vec<I>,
build_hasher: Option<RandomState>,
) -> (Vec<Vec<(u64, T)>>, RandomState)
where
I: IntoIterator<Item = T> + Send,
I::IntoIter: TrustedLen,
T: Send + Hash + Eq,
{
let build_hasher = build_hasher.unwrap_or_default();
let hashes = POOL.install(|| {
iters
.into_par_iter()
.map(|iter| {
iter.into_iter()
.map(|val| {
let mut hasher = build_hasher.build_hasher();
val.hash(&mut hasher);
(hasher.finish(), val)
})
.collect_trusted::<Vec<_>>()
})
.collect()
});
(hashes, build_hasher)
}
#[inline]
pub fn _boost_hash_combine(l: u64, r: u64) -> u64 {
l ^ r.wrapping_add(0x9e3779b9u64.wrapping_add(l << 6).wrapping_add(r >> 2))
}
pub(crate) fn df_rows_to_hashes_threaded(
keys: &[DataFrame],
hasher_builder: Option<RandomState>,
) -> PolarsResult<(Vec<UInt64Chunked>, RandomState)> {
let hasher_builder = hasher_builder.unwrap_or_default();
let hashes = POOL.install(|| {
keys.into_par_iter()
.map(|df| {
let hb = hasher_builder.clone();
let (ca, _) = df_rows_to_hashes(df, Some(hb))?;
Ok(ca)
})
.collect::<PolarsResult<Vec<_>>>()
})?;
Ok((hashes, hasher_builder))
}
pub(crate) fn df_rows_to_hashes(
keys: &DataFrame,
build_hasher: Option<RandomState>,
) -> PolarsResult<(UInt64Chunked, RandomState)> {
let build_hasher = build_hasher.unwrap_or_default();
let mut iter = keys.iter();
let first = iter.next().expect("at least one key");
let mut hashes = vec![];
first.vec_hash(build_hasher.clone(), &mut hashes)?;
let hslice = hashes.as_mut_slice();
for keys in iter {
keys.vec_hash_combine(build_hasher.clone(), hslice)?;
}
let chunks = vec![Box::new(PrimitiveArray::new(
ArrowDataType::UInt64,
hashes.into(),
None,
)) as ArrayRef];
Ok((UInt64Chunked::from_chunks("", chunks), build_hasher))
}