use polars_utils::hashing::{DirtyHash, hash_to_partition};
use polars_utils::nulls::IsNull;
use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};
use super::*;
pub(super) fn build_table_semi_anti<T, I>(
keys: Vec<I>,
nulls_equal: bool,
) -> Vec<PlHashSet<<T as ToTotalOrd>::TotalOrdItem>>
where
T: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
<T as ToTotalOrd>::TotalOrdItem: Send + Sync + Hash + Eq + DirtyHash + IsNull,
I: IntoIterator<Item = T> + Copy + Send + Sync,
{
let n_partitions = _set_partition_size();
let par_iter = (0..n_partitions).into_par_iter().map(|partition_no| {
let mut hash_tbl: PlHashSet<T::TotalOrdItem> = PlHashSet::with_capacity(_HASHMAP_INIT_SIZE);
for keys in &keys {
keys.into_iter().for_each(|k| {
let k = k.to_total_ord();
if partition_no == hash_to_partition(k.dirty_hash(), n_partitions)
&& (!k.is_null() || nulls_equal)
{
hash_tbl.insert(k);
}
});
}
hash_tbl
});
RAYON.install(|| par_iter.collect())
}
fn semi_anti_impl<T, I>(
probe: Vec<I>,
build: Vec<I>,
nulls_equal: bool,
) -> impl ParallelIterator<Item = (IdxSize, bool)>
where
I: IntoIterator<Item = T> + Copy + Send + Sync,
T: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
<T as ToTotalOrd>::TotalOrdItem: Send + Sync + Hash + Eq + DirtyHash + IsNull,
{
let hash_sets = build_table_semi_anti(build, nulls_equal);
let offsets = probe_to_offsets(&probe);
let n_tables = hash_sets.len();
probe
.into_par_iter()
.zip(offsets)
.flat_map(move |(probe, offset)| {
let hash_sets = &hash_sets;
let probe_iter = probe.into_iter();
let mut results = Vec::with_capacity(probe_iter.size_hint().1.unwrap());
probe_iter.enumerate().for_each(|(idx_a, k)| {
let k = k.to_total_ord();
let idx_a = (idx_a + offset) as IdxSize;
let current_probe_table =
unsafe { hash_sets.get_unchecked(hash_to_partition(k.dirty_hash(), n_tables)) };
let value = current_probe_table.get(&k);
match value {
Some(_) => results.push((idx_a, true)),
None => results.push((idx_a, false)),
}
});
results
})
}
pub(super) fn hash_join_tuples_left_anti<T, I>(
probe: Vec<I>,
build: Vec<I>,
nulls_equal: bool,
) -> Vec<IdxSize>
where
I: IntoIterator<Item = T> + Copy + Send + Sync,
T: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
<T as ToTotalOrd>::TotalOrdItem: Send + Sync + Hash + Eq + DirtyHash + IsNull,
{
let par_iter = semi_anti_impl(probe, build, nulls_equal)
.filter(|tpls| !tpls.1)
.map(|tpls| tpls.0);
RAYON.install(|| par_iter.collect())
}
pub(super) fn hash_join_tuples_left_semi<T, I>(
probe: Vec<I>,
build: Vec<I>,
nulls_equal: bool,
) -> Vec<IdxSize>
where
I: IntoIterator<Item = T> + Copy + Send + Sync,
T: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
<T as ToTotalOrd>::TotalOrdItem: Send + Sync + Hash + Eq + DirtyHash + IsNull,
{
let par_iter = semi_anti_impl(probe, build, nulls_equal)
.filter(|tpls| tpls.1)
.map(|tpls| tpls.0);
RAYON.install(|| par_iter.collect())
}