use std::fmt::{self, Debug};
use std::ops::Sub;
use arrow::array::BooleanArray;
use arrow::buffer::BooleanBuffer;
use arrow::datatypes::ArrowNativeType;
use hashbrown::HashTable;
use hashbrown::hash_table::Entry::{Occupied, Vacant};
pub trait JoinHashMapType: Send + Sync {
fn extend_zero(&mut self, len: usize);
fn update_from_iter<'a>(
&mut self,
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
deleted_offset: usize,
);
fn get_matched_indices<'a>(
&self,
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
deleted_offset: Option<usize>,
) -> (Vec<u32>, Vec<u64>);
fn get_matched_indices_with_limit_offset(
&self,
hash_values: &[u64],
limit: usize,
offset: MapOffset,
input_indices: &mut Vec<u32>,
match_indices: &mut Vec<u64>,
) -> Option<MapOffset>;
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray;
fn is_empty(&self) -> bool;
fn len(&self) -> usize;
}
pub struct JoinHashMapU32 {
map: HashTable<(u64, u32)>,
next: Vec<u32>,
}
impl JoinHashMapU32 {
#[cfg(test)]
pub(crate) fn new(map: HashTable<(u64, u32)>, next: Vec<u32>) -> Self {
Self { map, next }
}
pub fn with_capacity(cap: usize) -> Self {
Self {
map: HashTable::with_capacity(cap),
next: vec![0; cap],
}
}
}
impl Debug for JoinHashMapU32 {
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}
impl JoinHashMapType for JoinHashMapU32 {
fn extend_zero(&mut self, _: usize) {}
fn update_from_iter<'a>(
&mut self,
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
deleted_offset: usize,
) {
update_from_iter::<u32>(&mut self.map, &mut self.next, iter, deleted_offset);
}
fn get_matched_indices<'a>(
&self,
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
deleted_offset: Option<usize>,
) -> (Vec<u32>, Vec<u64>) {
get_matched_indices::<u32>(&self.map, &self.next, iter, deleted_offset)
}
fn get_matched_indices_with_limit_offset(
&self,
hash_values: &[u64],
limit: usize,
offset: MapOffset,
input_indices: &mut Vec<u32>,
match_indices: &mut Vec<u64>,
) -> Option<MapOffset> {
get_matched_indices_with_limit_offset::<u32>(
&self.map,
&self.next,
hash_values,
limit,
offset,
input_indices,
match_indices,
)
}
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray {
contain_hashes(&self.map, hash_values)
}
fn is_empty(&self) -> bool {
self.map.is_empty()
}
fn len(&self) -> usize {
self.map.len()
}
}
pub struct JoinHashMapU64 {
map: HashTable<(u64, u64)>,
next: Vec<u64>,
}
impl JoinHashMapU64 {
#[cfg(test)]
pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
Self { map, next }
}
pub fn with_capacity(cap: usize) -> Self {
Self {
map: HashTable::with_capacity(cap),
next: vec![0; cap],
}
}
}
impl Debug for JoinHashMapU64 {
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}
impl JoinHashMapType for JoinHashMapU64 {
fn extend_zero(&mut self, _: usize) {}
fn update_from_iter<'a>(
&mut self,
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
deleted_offset: usize,
) {
update_from_iter::<u64>(&mut self.map, &mut self.next, iter, deleted_offset);
}
fn get_matched_indices<'a>(
&self,
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
deleted_offset: Option<usize>,
) -> (Vec<u32>, Vec<u64>) {
get_matched_indices::<u64>(&self.map, &self.next, iter, deleted_offset)
}
fn get_matched_indices_with_limit_offset(
&self,
hash_values: &[u64],
limit: usize,
offset: MapOffset,
input_indices: &mut Vec<u32>,
match_indices: &mut Vec<u64>,
) -> Option<MapOffset> {
get_matched_indices_with_limit_offset::<u64>(
&self.map,
&self.next,
hash_values,
limit,
offset,
input_indices,
match_indices,
)
}
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray {
contain_hashes(&self.map, hash_values)
}
fn is_empty(&self) -> bool {
self.map.is_empty()
}
fn len(&self) -> usize {
self.map.len()
}
}
use crate::joins::MapOffset;
use crate::joins::chain::traverse_chain;
pub fn update_from_iter<'a, T>(
map: &mut HashTable<(u64, T)>,
next: &mut [T],
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
deleted_offset: usize,
) where
T: Copy + TryFrom<usize> + PartialOrd,
<T as TryFrom<usize>>::Error: Debug,
{
for (row, &hash_value) in iter {
let entry = map.entry(
hash_value,
|&(hash, _)| hash_value == hash,
|&(hash, _)| hash,
);
match entry {
Occupied(mut occupied_entry) => {
let (_, index) = occupied_entry.get_mut();
let prev_index = *index;
*index = T::try_from(row + 1).unwrap();
next[row - deleted_offset] = prev_index;
}
Vacant(vacant_entry) => {
vacant_entry.insert((hash_value, T::try_from(row + 1).unwrap()));
}
}
}
}
pub fn get_matched_indices<'a, T>(
map: &HashTable<(u64, T)>,
next: &[T],
iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
deleted_offset: Option<usize>,
) -> (Vec<u32>, Vec<u64>)
where
T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
<T as TryFrom<usize>>::Error: Debug,
{
let mut input_indices = vec![];
let mut match_indices = vec![];
let zero = T::try_from(0).unwrap();
let one = T::try_from(1).unwrap();
for (row_idx, hash_value) in iter {
if let Some((_, index)) = map.find(*hash_value, |(hash, _)| *hash_value == *hash)
{
let mut i = *index - one;
loop {
let match_row_idx = if let Some(offset) = deleted_offset {
let offset = T::try_from(offset).unwrap();
if i < offset {
break;
}
i - offset
} else {
i
};
match_indices.push(match_row_idx.into());
input_indices.push(row_idx as u32);
let next_chain = next[match_row_idx.into() as usize];
if next_chain == zero {
break;
}
i = next_chain - one;
}
}
}
(input_indices, match_indices)
}
pub fn get_matched_indices_with_limit_offset<T>(
map: &HashTable<(u64, T)>,
next_chain: &[T],
hash_values: &[u64],
limit: usize,
offset: MapOffset,
input_indices: &mut Vec<u32>,
match_indices: &mut Vec<u64>,
) -> Option<MapOffset>
where
T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
<T as TryFrom<usize>>::Error: Debug,
T: ArrowNativeType,
{
input_indices.clear();
match_indices.clear();
let one = T::try_from(1).unwrap();
if map.len() == next_chain.len() {
let start = offset.0;
let end = (start + limit).min(hash_values.len());
for (i, &hash) in hash_values[start..end].iter().enumerate() {
if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
input_indices.push(start as u32 + i as u32);
match_indices.push((*idx - one).into());
}
}
return if end == hash_values.len() {
None
} else {
Some((end, None))
};
}
let mut remaining_output = limit;
let to_skip = match offset {
(idx, None) => idx,
(idx, Some(0)) => idx + 1,
(idx, Some(next_idx)) => {
let next_idx: T = T::usize_as(next_idx as usize);
let is_last = idx == hash_values.len() - 1;
if let Some(next_offset) = traverse_chain(
next_chain,
idx,
next_idx,
&mut remaining_output,
input_indices,
match_indices,
is_last,
) {
return Some(next_offset);
}
idx + 1
}
};
let hash_values_len = hash_values.len();
for (i, &hash) in hash_values[to_skip..].iter().enumerate() {
let row_idx = to_skip + i;
if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
let idx: T = *idx;
let is_last = row_idx == hash_values_len - 1;
if let Some(next_offset) = traverse_chain(
next_chain,
row_idx,
idx,
&mut remaining_output,
input_indices,
match_indices,
is_last,
) {
return Some(next_offset);
}
}
}
None
}
pub fn contain_hashes<T>(map: &HashTable<(u64, T)>, hash_values: &[u64]) -> BooleanArray {
let buffer = BooleanBuffer::collect_bool(hash_values.len(), |i| {
let hash = hash_values[i];
map.find(hash, |(h, _)| hash == *h).is_some()
});
BooleanArray::new(buffer, None)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_contain_hashes() {
let mut hash_map = JoinHashMapU32::with_capacity(10);
hash_map.update_from_iter(Box::new([10u64, 20u64, 30u64].iter().enumerate()), 0);
let probe_hashes = vec![10, 11, 20, 21, 30, 31];
let array = hash_map.contain_hashes(&probe_hashes);
assert_eq!(array.len(), probe_hashes.len());
for (i, &hash) in probe_hashes.iter().enumerate() {
if matches!(hash, 10 | 20 | 30) {
assert!(array.value(i), "Hash {hash} should exist in the map");
} else {
assert!(!array.value(i), "Hash {hash} should NOT exist in the map");
}
}
}
}