use crate::datatypes::UInt64Chunked;
use crate::prelude::*;
use crate::POOL;
use ahash::RandomState;
use arrow::array::ArrayRef;
use hashbrown::{hash_map::RawEntryMut, HashMap};
use itertools::Itertools;
use rayon::prelude::*;
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};
pub trait VecHash {
fn vec_hash(&self, _random_state: RandomState) -> UInt64Chunked {
unimplemented!()
}
}
pub trait VecHashId {
fn vec_hash_id(&self) -> UInt64Chunked {
unimplemented!()
}
}
const RANDOM_U64: u64 = 4352984574;
impl VecHashId for UInt64Chunked {
fn vec_hash_id(&self) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| match opt_v {
None => RANDOM_U64,
Some(v) => v,
})
}
}
impl VecHashId for UInt32Chunked {
fn vec_hash_id(&self) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| match opt_v {
None => RANDOM_U64,
Some(v) => v as u64,
})
}
}
impl VecHashId for Int32Chunked {
fn vec_hash_id(&self) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| match opt_v {
None => RANDOM_U64,
Some(v) => (unsafe { std::mem::transmute::<i32, u32>(v) }) as u64,
})
}
}
impl VecHashId for Int64Chunked {
fn vec_hash_id(&self) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| match opt_v {
None => RANDOM_U64,
Some(v) => unsafe { std::mem::transmute::<i64, u64>(v) },
})
}
}
impl Series {
pub fn vec_hash_id(&self) -> UInt64Chunked {
use DataType::*;
match self.dtype() {
UInt64 => self.u64().unwrap().vec_hash_id(),
Int64 => self.i64().unwrap().vec_hash_id(),
Int32 => self.i32().unwrap().vec_hash_id(),
UInt32 => self.u32().unwrap().vec_hash_id(),
_ => unimplemented!(),
}
}
}
impl<T> VecHash for ChunkedArray<T>
where
T: PolarsIntegerType,
T::Native: Hash,
{
fn vec_hash(&self, random_state: RandomState) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
})
}
}
impl VecHash for Utf8Chunked {
fn vec_hash(&self, random_state: RandomState) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
})
}
}
impl VecHash for BooleanChunked {
fn vec_hash(&self, random_state: RandomState) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
})
}
}
impl VecHash for Float32Chunked {
fn vec_hash(&self, random_state: RandomState) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| {
let opt_v = opt_v.map(|v| v.to_bits());
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
})
}
}
impl VecHash for Float64Chunked {
fn vec_hash(&self, random_state: RandomState) -> UInt64Chunked {
self.branch_apply_cast_numeric_no_null(|opt_v| {
let opt_v = opt_v.map(|v| v.to_bits());
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
})
}
}
impl VecHash for ListChunked {}
pub struct IdHasher {
hash: u64,
}
impl Hasher for IdHasher {
fn finish(&self) -> u64 {
self.hash
}
fn write(&mut self, _bytes: &[u8]) {
unreachable!("IdHasher should only be used for integer keys <= 64 bit precision")
}
fn write_u32(&mut self, i: u32) {
self.write_u64(i as u64)
}
fn write_u64(&mut self, i: u64) {
self.hash = i;
}
fn write_i32(&mut self, i: i32) {
unsafe { self.write_u32(std::mem::transmute::<i32, u32>(i)) }
}
fn write_i64(&mut self, i: i64) {
unsafe { self.write_u64(std::mem::transmute::<i64, u64>(i)) }
}
}
impl Default for IdHasher {
fn default() -> Self {
IdHasher { hash: 0 }
}
}
pub type IdBuildHasher = BuildHasherDefault<IdHasher>;
#[derive(Debug)]
pub(crate) struct IdxHash {
pub(crate) idx: u32,
hash: u64,
}
impl Hash for IdxHash {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash)
}
}
impl IdxHash {
#[inline]
pub(crate) fn new(idx: u32, hash: u64) -> Self {
IdxHash { idx, hash }
}
}
#[inline]
pub(crate) fn this_thread(h: u64, thread_no: u64, n_threads: u64) -> bool {
(h + thread_no) % n_threads == 0
}
fn finish_table_from_key_hashes<T>(
hashes_nd_keys: Vec<(u64, T)>,
mut hash_tbl: HashMap<T, Vec<u32>, RandomState>,
offset: usize,
) -> HashMap<T, Vec<u32>, RandomState>
where
T: Hash + Eq,
{
hashes_nd_keys
.into_iter()
.enumerate()
.for_each(|(idx, (h, t))| {
let idx = (idx + offset) as u32;
hash_tbl
.raw_entry_mut()
.from_key_hashed_nocheck(h, &t)
.and_modify(|_k, v| {
v.push(idx);
})
.or_insert_with(|| (t, vec![idx]));
});
hash_tbl
}
pub(crate) fn prepare_hashed_relation<T>(
b: impl Iterator<Item = T>,
) -> HashMap<T, Vec<u32>, RandomState>
where
T: Hash + Eq,
{
let build_hasher = RandomState::default();
let hashes_nd_keys = b
.map(|val| {
let mut hasher = build_hasher.build_hasher();
val.hash(&mut hasher);
(hasher.finish(), val)
})
.collect::<Vec<_>>();
let hash_tbl: HashMap<T, Vec<u32>, RandomState> =
HashMap::with_capacity_and_hasher(hashes_nd_keys.len(), build_hasher);
finish_table_from_key_hashes(hashes_nd_keys, hash_tbl, 0)
}
pub(crate) fn prepare_hashed_relation_threaded<T, I>(
iters: Vec<I>,
) -> Vec<HashMap<T, Vec<u32>, RandomState>>
where
I: Iterator<Item = T> + Send,
T: Send + Hash + Eq + Sync + Copy,
{
let n_threads = iters.len();
let (hashes_and_keys, build_hasher) = create_hash_and_keys_threaded_vectorized(iters, None);
let size = hashes_and_keys.iter().fold(0, |acc, v| acc + v.len());
POOL.install(|| {
(0..n_threads).into_par_iter().map(|thread_no| {
let build_hasher = build_hasher.clone();
let hashes_and_keys = &hashes_and_keys;
let thread_no = thread_no as u64;
let mut hash_tbl: HashMap<T, Vec<u32>, RandomState> =
HashMap::with_capacity_and_hasher(size / (5 * n_threads), build_hasher);
let n_threads = n_threads as u64;
let mut offset = 0;
for hashes_and_keys in hashes_and_keys {
let len = hashes_and_keys.len();
hashes_and_keys
.iter()
.enumerate()
.for_each(|(idx, (h, k))| {
let idx = idx as u32;
if this_thread(*h, thread_no, n_threads) {
let idx = idx + offset;
let entry = hash_tbl
.raw_entry_mut()
.from_key_hashed_nocheck(*h, &k);
match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(*h, *k, vec![idx]);
}
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
v.push(idx);
}
}
}
});
offset += len as u32;
}
hash_tbl
})
})
.collect()
}
pub(crate) fn create_hash_and_keys_threaded_vectorized<I, T>(
iters: Vec<I>,
build_hasher: Option<RandomState>,
) -> (Vec<Vec<(u64, T)>>, RandomState)
where
I: IntoIterator<Item = T> + Send,
T: Send + Hash + Eq,
{
let build_hasher = build_hasher.unwrap_or_default();
let hashes = POOL.install(|| {
iters
.into_par_iter()
.map(|iter| {
iter.into_iter()
.map(|val| {
let mut hasher = build_hasher.build_hasher();
val.hash(&mut hasher);
(hasher.finish(), val)
})
.collect_vec()
})
.collect()
});
(hashes, build_hasher)
}
fn combine_hashes(l: u64, r: u64) -> u64 {
let hash = (17 * 37u64).wrapping_add(l);
hash.wrapping_mul(37).wrapping_add(r)
}
pub(crate) fn df_rows_to_hashes_threaded(
keys: &[DataFrame],
hasher_builder: Option<RandomState>,
) -> (Vec<UInt64Chunked>, RandomState) {
let hasher_builder = hasher_builder.unwrap_or_default();
let hashes = POOL.install(|| {
keys.into_par_iter()
.map(|df| {
let hb = hasher_builder.clone();
let (ca, _) = df_rows_to_hashes(df, Some(hb));
ca
})
.collect()
});
(hashes, hasher_builder)
}
pub(crate) fn df_rows_to_hashes(
keys: &DataFrame,
build_hasher: Option<RandomState>,
) -> (UInt64Chunked, RandomState) {
let build_hasher = build_hasher.unwrap_or_default();
let hashes: Vec<_> = keys
.columns
.iter()
.map(|s| {
let h = s.vec_hash(build_hasher.clone());
debug_assert_eq!(h.null_count(), 0);
h
})
.collect();
let mut iter = hashes.into_iter();
let first = iter.next().unwrap();
(
iter.fold(first, |acc, s| {
let chunks = acc
.data_views()
.zip(s.data_views())
.map(|(array_left, array_right)| {
let av: AlignedVec<_> = array_left
.iter()
.zip(array_right)
.map(|(&l, &r)| combine_hashes(l, r))
.collect();
Arc::new(av.into_primitive_array::<UInt64Type>(None)) as ArrayRef
})
.collect();
UInt64Chunked::new_from_chunks("", chunks)
}),
build_hasher,
)
}