use ahash::RandomState;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
use arrow::compute::take;
use arrow::datatypes::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use itertools::Itertools;
use std::collections::HashMap;
#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_large_list_view_array,
as_list_array, as_list_view_array, as_map_array, as_string_array,
as_string_view_array, as_struct_array, as_union_array,
};
use crate::error::Result;
use crate::error::{_internal_datafusion_err, _internal_err};
use std::cell::RefCell;
#[inline]
pub fn combine_hashes(l: u64, r: u64) -> u64 {
let hash = (17 * 37u64).wrapping_add(l);
hash.wrapping_mul(37).wrapping_add(r)
}
const MAX_BUFFER_SIZE: usize = 524_288;
thread_local! {
static HASH_BUFFER: RefCell<Vec<u64>> = const { RefCell::new(Vec::new()) };
}
pub fn with_hashes<I, T, F, R>(
arrays: I,
random_state: &RandomState,
callback: F,
) -> Result<R>
where
I: IntoIterator<Item = T>,
T: AsDynArray,
F: FnOnce(&[u64]) -> Result<R>,
{
let mut iter = arrays.into_iter().peekable();
let required_size = match iter.peek() {
Some(arr) => arr.as_dyn_array().len(),
None => return _internal_err!("with_hashes requires at least one array"),
};
HASH_BUFFER.try_with(|cell| {
let mut buffer = cell.try_borrow_mut()
.map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?;
buffer.clear();
buffer.resize(required_size, 0);
create_hashes(iter, random_state, &mut buffer[..required_size])?;
let result = callback(&buffer[..required_size])?;
if buffer.capacity() > MAX_BUFFER_SIZE {
buffer.truncate(MAX_BUFFER_SIZE);
buffer.shrink_to_fit();
}
Ok(result)
}).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))?
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
if mul_col {
hashes_buffer.iter_mut().for_each(|hash| {
*hash = combine_hashes(random_state.hash_one(1), *hash);
})
} else {
hashes_buffer.iter_mut().for_each(|hash| {
*hash = random_state.hash_one(1);
})
}
}
pub trait HashValue {
fn hash_one(&self, state: &RandomState) -> u64;
}
impl<T: HashValue + ?Sized> HashValue for &T {
fn hash_one(&self, state: &RandomState) -> u64 {
T::hash_one(self, state)
}
}
macro_rules! hash_value {
($($t:ty),+) => {
$(impl HashValue for $t {
fn hash_one(&self, state: &RandomState) -> u64 {
state.hash_one(self)
}
})+
};
}
hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64, u128);
hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano);
macro_rules! hash_float_value {
($(($t:ty, $i:ty)),+) => {
$(impl HashValue for $t {
fn hash_one(&self, state: &RandomState) -> u64 {
state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes()))
}
})+
};
}
hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array_primitive<T>(
array: &PrimitiveArray<T>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) where
T: ArrowPrimitiveType<Native: HashValue>,
{
assert_eq!(
hashes_buffer.len(),
array.len(),
"hashes_buffer and array should be of equal length"
);
if array.null_count() == 0 {
if rehash {
for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) {
*hash = combine_hashes(value.hash_one(random_state), *hash);
}
} else {
for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) {
*hash = value.hash_one(random_state);
}
}
} else if rehash {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
let value = unsafe { array.value_unchecked(i) };
*hash = combine_hashes(value.hash_one(random_state), *hash);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
let value = unsafe { array.value_unchecked(i) };
*hash = value.hash_one(random_state);
}
}
}
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array<T>(
array: &T,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) where
T: ArrayAccessor,
T::Item: HashValue,
{
assert_eq!(
hashes_buffer.len(),
array.len(),
"hashes_buffer and array should be of equal length"
);
if array.null_count() == 0 {
if rehash {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
let value = unsafe { array.value_unchecked(i) };
*hash = combine_hashes(value.hash_one(random_state), *hash);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
let value = unsafe { array.value_unchecked(i) };
*hash = value.hash_one(random_state);
}
}
} else if rehash {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
let value = unsafe { array.value_unchecked(i) };
*hash = combine_hashes(value.hash_one(random_state), *hash);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
let value = unsafe { array.value_unchecked(i) };
*hash = value.hash_one(random_state);
}
}
}
}
#[inline(never)]
fn hash_string_view_array_inner<
T: ByteViewType,
const HAS_NULLS: bool,
const HAS_BUFFERS: bool,
const REHASH: bool,
>(
array: &GenericByteViewArray<T>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) {
assert_eq!(
hashes_buffer.len(),
array.len(),
"hashes_buffer and array should be of equal length"
);
let buffers = array.data_buffers();
let view_bytes = |view_len: u32, view: u128| {
let view = ByteView::from(view);
let offset = view.offset as usize;
unsafe {
let data = buffers.get_unchecked(view.buffer_index as usize);
data.get_unchecked(offset..offset + view_len as usize)
}
};
let hashes_and_views = hashes_buffer.iter_mut().zip(array.views().iter());
for (i, (hash, &v)) in hashes_and_views.enumerate() {
if HAS_NULLS && array.is_null(i) {
continue;
}
let view_len = v as u32;
if !HAS_BUFFERS || view_len <= 12 {
if REHASH {
*hash = combine_hashes(v.hash_one(random_state), *hash);
} else {
*hash = v.hash_one(random_state);
}
continue;
}
let value = view_bytes(view_len, v);
if REHASH {
*hash = combine_hashes(value.hash_one(random_state), *hash);
} else {
*hash = value.hash_one(random_state);
}
}
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_generic_byte_view_array<T: ByteViewType>(
array: &GenericByteViewArray<T>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) {
match (
array.null_count() != 0,
!array.data_buffers().is_empty(),
rehash,
) {
(false, false, false) => {
for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) {
*hash = view.hash_one(random_state);
}
}
(false, false, true) => {
for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) {
*hash = combine_hashes(view.hash_one(random_state), *hash);
}
}
(false, true, false) => hash_string_view_array_inner::<T, false, true, false>(
array,
random_state,
hashes_buffer,
),
(false, true, true) => hash_string_view_array_inner::<T, false, true, true>(
array,
random_state,
hashes_buffer,
),
(true, false, false) => hash_string_view_array_inner::<T, true, false, false>(
array,
random_state,
hashes_buffer,
),
(true, false, true) => hash_string_view_array_inner::<T, true, false, true>(
array,
random_state,
hashes_buffer,
),
(true, true, false) => hash_string_view_array_inner::<T, true, true, false>(
array,
random_state,
hashes_buffer,
),
(true, true, true) => hash_string_view_array_inner::<T, true, true, true>(
array,
random_state,
hashes_buffer,
),
}
}
#[inline(never)]
fn hash_dictionary_inner<
K: ArrowDictionaryKeyType,
const HAS_NULL_KEYS: bool,
const HAS_NULL_VALUES: bool,
const MULTI_COL: bool,
>(
array: &DictionaryArray<K>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let dict_values = array.values();
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes([dict_values], random_state, &mut dict_hashes)?;
if HAS_NULL_KEYS {
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
let idx = key.as_usize();
if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
if MULTI_COL {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
}
}
} else {
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().values()) {
let idx = key.as_usize();
if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
if MULTI_COL {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
}
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
multi_col: bool,
) -> Result<()> {
let has_null_keys = array.keys().null_count() != 0;
let has_null_values = array.values().null_count() != 0;
match (has_null_keys, has_null_values, multi_col) {
(false, false, false) => hash_dictionary_inner::<K, false, false, false>(
array,
random_state,
hashes_buffer,
),
(false, false, true) => hash_dictionary_inner::<K, false, false, true>(
array,
random_state,
hashes_buffer,
),
(false, true, false) => hash_dictionary_inner::<K, false, true, false>(
array,
random_state,
hashes_buffer,
),
(false, true, true) => hash_dictionary_inner::<K, false, true, true>(
array,
random_state,
hashes_buffer,
),
(true, false, false) => hash_dictionary_inner::<K, true, false, false>(
array,
random_state,
hashes_buffer,
),
(true, false, true) => hash_dictionary_inner::<K, true, false, true>(
array,
random_state,
hashes_buffer,
),
(true, true, false) => hash_dictionary_inner::<K, true, true, false>(
array,
random_state,
hashes_buffer,
),
(true, true, true) => hash_dictionary_inner::<K, true, true, true>(
array,
random_state,
hashes_buffer,
),
}
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_struct_array(
array: &StructArray,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let nulls = array.nulls();
let row_len = array.len();
let mut values_hashes = vec![0u64; row_len];
create_hashes(array.columns(), random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for i in nulls.valid_indices() {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
}
} else {
for i in 0..row_len {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
}
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_map_array(
array: &MapArray,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let nulls = array.nulls();
let offsets = array.offsets();
let first_offset = offsets.first().copied().unwrap_or_default() as usize;
let last_offset = offsets.last().copied().unwrap_or_default() as usize;
let entries_len = last_offset - first_offset;
let mut values_hashes = vec![0u64; entries_len];
let entries = array.entries();
let sliced_columns: Vec<ArrayRef> = entries
.columns()
.iter()
.map(|col| col.slice(first_offset, entries_len))
.collect();
create_hashes(&sliced_columns, random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes
[start.as_usize() - first_offset..stop.as_usize() - first_offset]
{
*hash = combine_hashes(*hash, *values_hash);
}
}
}
} else {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes
[start.as_usize() - first_offset..stop.as_usize() - first_offset]
{
*hash = combine_hashes(*hash, *values_hash);
}
}
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()>
where
OffsetSize: OffsetSizeTrait,
{
let first_offset = array.value_offsets().first().cloned().unwrap_or_default();
let last_offset = array.value_offsets().last().cloned().unwrap_or_default();
let value_bytes_len = (last_offset - first_offset).as_usize();
let mut values_hashes = vec![0u64; value_bytes_len];
create_hashes(
[array
.values()
.slice(first_offset.as_usize(), value_bytes_len)],
random_state,
&mut values_hashes,
)?;
if array.null_count() > 0 {
for (i, (start, stop)) in array.value_offsets().iter().tuple_windows().enumerate()
{
if array.is_valid(i) {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[(*start - first_offset).as_usize()
..(*stop - first_offset).as_usize()]
{
*hash = combine_hashes(*hash, *values_hash);
}
}
}
} else {
for ((start, stop), hash) in array
.value_offsets()
.iter()
.tuple_windows()
.zip(hashes_buffer.iter_mut())
{
for values_hash in &values_hashes
[(*start - first_offset).as_usize()..(*stop - first_offset).as_usize()]
{
*hash = combine_hashes(*hash, *values_hash);
}
}
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_list_view_array<OffsetSize>(
array: &GenericListViewArray<OffsetSize>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()>
where
OffsetSize: OffsetSizeTrait,
{
let values = array.values();
let offsets = array.value_offsets();
let sizes = array.value_sizes();
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes([values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
let start = offset.as_usize();
let end = start + size.as_usize();
for values_hash in &values_hashes[start..end] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
} else {
for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
let hash = &mut hashes_buffer[i];
let start = offset.as_usize();
let end = start + size.as_usize();
for values_hash in &values_hashes[start..end] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_union_array(
array: &UnionArray,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let DataType::Union(union_fields, _mode) = array.data_type() else {
unreachable!()
};
if array.is_dense() {
hash_union_array_default(array, union_fields, random_state, hashes_buffer)
} else {
hash_sparse_union_array(array, union_fields, random_state, hashes_buffer)
}
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_union_array_default(
array: &UnionArray,
union_fields: &UnionFields,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let mut child_hashes: HashMap<i8, Vec<u64>> =
HashMap::with_capacity(union_fields.len());
for (type_id, _field) in union_fields.iter() {
let child = array.child(type_id);
let mut child_hash_buffer = vec![0; child.len()];
create_hashes([child], random_state, &mut child_hash_buffer)?;
child_hashes.insert(type_id, child_hash_buffer);
}
#[expect(clippy::needless_range_loop)]
for i in 0..array.len() {
let type_id = array.type_id(i);
let child_offset = array.value_offset(i);
let child_hash = child_hashes.get(&type_id).expect("invalid type_id");
hashes_buffer[i] = combine_hashes(hashes_buffer[i], child_hash[child_offset]);
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_sparse_union_array(
array: &UnionArray,
union_fields: &UnionFields,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
use std::collections::HashMap;
if union_fields.len() <= 2 {
return hash_union_array_default(
array,
union_fields,
random_state,
hashes_buffer,
);
}
let type_ids = array.type_ids();
let mut indices_by_type: HashMap<i8, Vec<u32>> = HashMap::new();
for (i, &type_id) in type_ids.iter().enumerate() {
indices_by_type.entry(type_id).or_default().push(i as u32);
}
for (type_id, _field) in union_fields.iter() {
if let Some(indices) = indices_by_type.get(&type_id) {
if indices.is_empty() {
continue;
}
let child = array.child(type_id);
let indices_array = UInt32Array::from(indices.clone());
let filtered = take(child.as_ref(), &indices_array, None)?;
let mut filtered_hashes = vec![0u64; filtered.len()];
create_hashes([&filtered], random_state, &mut filtered_hashes)?;
for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) {
hashes_buffer[idx as usize] =
combine_hashes(hashes_buffer[idx as usize], *hash);
}
}
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let values = array.values();
let value_length = array.value_length() as usize;
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes([values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for i in 0..array.len() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
for values_hash in
&values_hashes[i * value_length..(i + 1) * value_length]
{
*hash = combine_hashes(*hash, *values_hash);
}
}
}
} else {
for i in 0..array.len() {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
Ok(())
}
#[inline(never)]
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_run_array_inner<
R: RunEndIndexType,
const HAS_NULL_VALUES: bool,
const REHASH: bool,
>(
array: &RunArray<R>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let array_offset = array.offset();
let array_len = array.len();
if array_len == 0 {
return Ok(());
}
let run_ends = array.run_ends();
let run_ends_values = run_ends.values();
let values = array.values();
let start_physical_index = array.get_start_physical_index();
let end_physical_index = array.get_end_physical_index() + 1;
let sliced_values = values.slice(
start_physical_index,
end_physical_index - start_physical_index,
);
let mut values_hashes = vec![0u64; sliced_values.len()];
create_hashes(
std::slice::from_ref(&sliced_values),
random_state,
&mut values_hashes,
)?;
let mut start_in_slice = 0;
for (adjusted_physical_index, &absolute_run_end) in run_ends_values
[start_physical_index..end_physical_index]
.iter()
.enumerate()
{
let absolute_run_end = absolute_run_end.as_usize();
let end_in_slice = (absolute_run_end - array_offset).min(array_len);
if HAS_NULL_VALUES && sliced_values.is_null(adjusted_physical_index) {
start_in_slice = end_in_slice;
continue;
}
let value_hash = values_hashes[adjusted_physical_index];
let run_slice = &mut hashes_buffer[start_in_slice..end_in_slice];
if REHASH {
for hash in run_slice.iter_mut() {
*hash = combine_hashes(value_hash, *hash);
}
} else {
run_slice.fill(value_hash);
}
start_in_slice = end_in_slice;
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_run_array<R: RunEndIndexType>(
array: &RunArray<R>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
let has_null_values = array.values().null_count() != 0;
match (has_null_values, rehash) {
(false, false) => {
hash_run_array_inner::<R, false, false>(array, random_state, hashes_buffer)
}
(false, true) => {
hash_run_array_inner::<R, false, true>(array, random_state, hashes_buffer)
}
(true, false) => {
hash_run_array_inner::<R, true, false>(array, random_state, hashes_buffer)
}
(true, true) => {
hash_run_array_inner::<R, true, true>(array, random_state, hashes_buffer)
}
}
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
downcast_primitive_array! {
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_generic_byte_view_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(&as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_generic_byte_view_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(&as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(&array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
}
DataType::List(_) => {
let array = as_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::ListView(_) => {
let array = as_list_view_array(array)?;
hash_list_view_array(array, random_state, hashes_buffer)?;
}
DataType::LargeListView(_) => {
let array = as_large_list_view_array(array)?;
hash_list_view_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
DataType::Union(_, _) => {
let array = as_union_array(array)?;
hash_union_array(array, random_state, hashes_buffer)?;
}
DataType::RunEndEncoded(_, _) => downcast_run_array! {
array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
_ => {
return _internal_err!(
"Unsupported data type in hasher: {}",
array.data_type()
);
}
}
Ok(())
}
#[cfg(feature = "force_hash_collisions")]
fn hash_single_array(
_array: &dyn Array,
_random_state: &RandomState,
hashes_buffer: &mut [u64],
_rehash: bool,
) -> Result<()> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(())
}
pub trait AsDynArray {
fn as_dyn_array(&self) -> &dyn Array;
}
impl AsDynArray for dyn Array {
fn as_dyn_array(&self) -> &dyn Array {
self
}
}
impl AsDynArray for &dyn Array {
fn as_dyn_array(&self) -> &dyn Array {
*self
}
}
impl AsDynArray for ArrayRef {
fn as_dyn_array(&self) -> &dyn Array {
self.as_ref()
}
}
impl AsDynArray for &ArrayRef {
fn as_dyn_array(&self) -> &dyn Array {
self.as_ref()
}
}
pub fn create_hashes<'a, I, T>(
arrays: I,
random_state: &RandomState,
hashes_buffer: &'a mut [u64],
) -> Result<&'a mut [u64]>
where
I: IntoIterator<Item = T>,
T: AsDynArray,
{
for (i, array) in arrays.into_iter().enumerate() {
let rehash = i >= 1;
hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
}
Ok(hashes_buffer)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::datatypes::*;
use super::*;
#[test]
fn create_hashes_for_decimal_array() -> Result<()> {
let array = vec![1, 2, 3, 4]
.into_iter()
.map(Some)
.collect::<Decimal128Array>()
.with_precision_and_scale(20, 3)
.unwrap();
let array_ref: ArrayRef = Arc::new(array);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; array_ref.len()];
let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 4);
Ok(())
}
#[test]
fn create_hashes_for_empty_fixed_size_lit() -> Result<()> {
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut [0; 0];
let hashes = create_hashes(
&[Arc::new(empty_array) as ArrayRef],
&random_state,
hashes_buff,
)?;
assert_eq!(hashes, &Vec::<u64>::new());
Ok(())
}
#[test]
fn create_hashes_for_float_arrays() -> Result<()> {
let f32_arr: ArrayRef =
Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
let f64_arr: ArrayRef =
Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; f32_arr.len()];
let hashes = create_hashes(&[f32_arr], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 4,);
let hashes = create_hashes(&[f64_arr], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 4,);
Ok(())
}
macro_rules! create_hash_binary {
($NAME:ident, $ARRAY:ty) => {
#[cfg(not(feature = "force_hash_collisions"))]
#[test]
fn $NAME() {
let binary = [
Some(b"short".to_byte_slice()),
None,
Some(b"long but different 12 bytes string"),
Some(b"short2"),
Some(b"Longer than 12 bytes string"),
Some(b"short"),
Some(b"Longer than 12 bytes string"),
];
let binary_array: ArrayRef =
Arc::new(binary.iter().cloned().collect::<$ARRAY>());
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut binary_hashes = vec![0; binary.len()];
create_hashes(&[binary_array], &random_state, &mut binary_hashes)
.unwrap();
for (val, hash) in binary.iter().zip(binary_hashes.iter()) {
match val {
Some(_) => assert_ne!(*hash, 0),
None => assert_eq!(*hash, 0),
}
}
assert_eq!(binary[0], binary[5]);
assert_eq!(binary[4], binary[6]);
assert_ne!(binary[0], binary[2]);
}
};
}
create_hash_binary!(binary_array, BinaryArray);
create_hash_binary!(large_binary_array, LargeBinaryArray);
create_hash_binary!(binary_view_array, BinaryViewArray);
#[test]
fn create_hashes_fixed_size_binary() -> Result<()> {
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
let fixed_size_binary_array: ArrayRef =
Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; fixed_size_binary_array.len()];
let hashes =
create_hashes(&[fixed_size_binary_array], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 3,);
Ok(())
}
macro_rules! create_hash_string {
($NAME:ident, $ARRAY:ty) => {
#[cfg(not(feature = "force_hash_collisions"))]
#[test]
fn $NAME() {
let strings = [
Some("short"),
None,
Some("long but different 12 bytes string"),
Some("short2"),
Some("Longer than 12 bytes string"),
Some("short"),
Some("Longer than 12 bytes string"),
];
let string_array: ArrayRef =
Arc::new(strings.iter().cloned().collect::<$ARRAY>());
let dict_array: ArrayRef = Arc::new(
strings
.iter()
.cloned()
.collect::<DictionaryArray<Int8Type>>(),
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut string_hashes = vec![0; strings.len()];
create_hashes(&[string_array], &random_state, &mut string_hashes)
.unwrap();
let mut dict_hashes = vec![0; strings.len()];
create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
for (val, hash) in strings.iter().zip(string_hashes.iter()) {
match val {
Some(_) => assert_ne!(*hash, 0),
None => assert_eq!(*hash, 0),
}
}
assert_eq!(string_hashes, dict_hashes);
assert_eq!(strings[0], strings[5]);
assert_eq!(strings[4], strings[6]);
assert_ne!(strings[0], strings[2]);
}
};
}
create_hash_string!(string_array, StringArray);
create_hash_string!(large_string_array, LargeStringArray);
create_hash_string!(string_view_array, StringArray);
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_run_array() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; array.len()];
let hashes = create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
hashes_buff,
)?;
assert_eq!(hashes.len(), 7);
assert_eq!(hashes[0], hashes[1]);
assert_eq!(hashes[2], hashes[3]);
assert_eq!(hashes[3], hashes[4]);
assert_eq!(hashes[5], hashes[6]);
assert_ne!(hashes[0], hashes[2]);
assert_ne!(hashes[2], hashes[5]);
assert_ne!(hashes[0], hashes[5]);
Ok(())
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_multi_column_hash_with_run_array() -> Result<()> {
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut one_col_hashes = vec![0; int_array.len()];
create_hashes(
&[Arc::clone(&int_array) as ArrayRef],
&random_state,
&mut one_col_hashes,
)?;
let mut two_col_hashes = vec![0; int_array.len()];
create_hashes(
&[
Arc::clone(&int_array) as ArrayRef,
Arc::clone(&run_array) as ArrayRef,
],
&random_state,
&mut two_col_hashes,
)?;
assert_eq!(one_col_hashes.len(), 7);
assert_eq!(two_col_hashes.len(), 7);
assert_ne!(one_col_hashes, two_col_hashes);
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
Ok(())
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_dict_arrays() {
let strings = [Some("foo"), None, Some("bar"), Some("foo"), None];
let string_array: ArrayRef =
Arc::new(strings.iter().cloned().collect::<StringArray>());
let dict_array: ArrayRef = Arc::new(
strings
.iter()
.cloned()
.collect::<DictionaryArray<Int8Type>>(),
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut string_hashes = vec![0; strings.len()];
create_hashes(&[string_array], &random_state, &mut string_hashes).unwrap();
let mut dict_hashes = vec![0; strings.len()];
create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
for (val, hash) in strings.iter().zip(string_hashes.iter()) {
match val {
Some(_) => assert_ne!(*hash, 0),
None => assert_eq!(*hash, 0),
}
}
assert_eq!(string_hashes, dict_hashes);
assert_eq!(strings[1], strings[4]);
assert_eq!(dict_hashes[1], dict_hashes[4]);
assert_eq!(strings[0], strings[3]);
assert_eq!(dict_hashes[0], dict_hashes[3]);
assert_ne!(strings[0], strings[2]);
assert_ne!(dict_hashes[0], dict_hashes[2]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_list_arrays() {
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(3), None, Some(5)]),
None,
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![]),
];
let list_array =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
assert_eq!(hashes[2], hashes[3]);
assert_eq!(hashes[1], hashes[6]); }
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_sliced_list_arrays() {
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(3), None, Some(5)]),
None,
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![]),
];
let list_array =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
let list_array = list_array.slice(2, 3);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[1]);
assert_ne!(hashes[1], hashes[2]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_list_view_arrays() {
use arrow::buffer::{NullBuffer, ScalarBuffer};
let values = Arc::new(Int32Array::from(vec![
Some(0),
Some(1),
Some(2),
Some(3),
None,
Some(5),
])) as ArrayRef;
let field = Arc::new(Field::new("item", DataType::Int32, true));
let offsets = ScalarBuffer::from(vec![0i32, 0, 3, 3, 0, 0, 0]);
let sizes = ScalarBuffer::from(vec![3i32, 0, 3, 3, 0, 3, 0]);
let nulls = Some(NullBuffer::from(vec![
true, false, true, true, false, true, true,
]));
let list_view_array =
Arc::new(ListViewArray::new(field, offsets, sizes, values, nulls))
as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; list_view_array.len()];
create_hashes(&[list_view_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); assert_eq!(hashes[2], hashes[3]); assert_eq!(hashes[1], hashes[6]);
assert_ne!(hashes[0], hashes[2]); assert_ne!(hashes[0], hashes[6]); assert_ne!(hashes[2], hashes[6]); }
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_large_list_view_arrays() {
use arrow::buffer::{NullBuffer, ScalarBuffer};
let values = Arc::new(Int32Array::from(vec![
Some(0),
Some(1),
Some(2),
Some(3),
None,
Some(5),
])) as ArrayRef;
let field = Arc::new(Field::new("item", DataType::Int32, true));
let offsets = ScalarBuffer::from(vec![0i64, 0, 3, 3, 0, 0, 0]);
let sizes = ScalarBuffer::from(vec![3i64, 0, 3, 3, 0, 3, 0]);
let nulls = Some(NullBuffer::from(vec![
true, false, true, true, false, true, true,
]));
let large_list_view_array = Arc::new(LargeListViewArray::new(
field, offsets, sizes, values, nulls,
)) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; large_list_view_array.len()];
create_hashes(&[large_list_view_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); assert_eq!(hashes[2], hashes[3]); assert_eq!(hashes[1], hashes[6]);
assert_ne!(hashes[0], hashes[2]); assert_ne!(hashes[0], hashes[6]); assert_ne!(hashes[2], hashes[6]); }
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_fixed_size_list_arrays() {
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(3), None, Some(5)]),
None,
Some(vec![Some(0), Some(1), Some(2)]),
];
let list_array =
Arc::new(FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
data, 3,
)) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
assert_eq!(hashes[2], hashes[3]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_struct_arrays() {
use arrow::buffer::Buffer;
let boolarr = Arc::new(BooleanArray::from(vec![
false, false, true, true, true, true,
]));
let i32arr = Arc::new(Int32Array::from(vec![10, 10, 20, 20, 30, 31]));
let struct_array = StructArray::from((
vec![
(
Arc::new(Field::new("bool", DataType::Boolean, false)),
Arc::clone(&boolarr) as ArrayRef,
),
(
Arc::new(Field::new("i32", DataType::Int32, false)),
Arc::clone(&i32arr) as ArrayRef,
),
(
Arc::new(Field::new("i32", DataType::Int32, false)),
Arc::clone(&i32arr) as ArrayRef,
),
(
Arc::new(Field::new("bool", DataType::Boolean, false)),
Arc::clone(&boolarr) as ArrayRef,
),
],
Buffer::from(&[0b001011]),
));
assert!(struct_array.is_valid(0));
assert!(struct_array.is_valid(1));
assert!(struct_array.is_null(2));
assert!(struct_array.is_valid(3));
assert!(struct_array.is_null(4));
assert!(struct_array.is_null(5));
let array = Arc::new(struct_array) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array.len()];
create_hashes(&[array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[1]);
assert_ne!(hashes[2], hashes[3]);
assert_eq!(hashes[4], hashes[5]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_struct_arrays_more_column_than_row() {
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("bool", DataType::Boolean, false)),
Arc::new(BooleanArray::from(vec![false, false])) as ArrayRef,
),
(
Arc::new(Field::new("i32-1", DataType::Int32, false)),
Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
),
(
Arc::new(Field::new("i32-2", DataType::Int32, false)),
Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
),
(
Arc::new(Field::new("i32-3", DataType::Int32, false)),
Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
),
]);
assert!(struct_array.is_valid(0));
assert!(struct_array.is_valid(1));
let array = Arc::new(struct_array) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array.len()];
create_hashes(&[array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[1]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_map_arrays() {
let mut builder =
MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
builder.keys().append_value("key1");
builder.keys().append_value("key2");
builder.values().append_value(1);
builder.values().append_value(2);
builder.append(true).unwrap();
builder.keys().append_value("key1");
builder.keys().append_value("key2");
builder.values().append_value(1);
builder.values().append_value(2);
builder.append(true).unwrap();
builder.keys().append_value("key1");
builder.keys().append_value("key2");
builder.values().append_value(1);
builder.values().append_value(3);
builder.append(true).unwrap();
builder.keys().append_value("key1");
builder.keys().append_value("key3");
builder.values().append_value(1);
builder.values().append_value(2);
builder.append(true).unwrap();
builder.keys().append_value("key1");
builder.values().append_value(1);
builder.append(true).unwrap();
builder.keys().append_value("key1");
builder.values().append_null();
builder.append(true).unwrap();
builder.append(true).unwrap();
builder.keys().append_value("key1");
builder.values().append_value(1);
builder.append(false).unwrap();
let array = Arc::new(builder.finish()) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array.len()];
create_hashes(&[array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[1]); assert_ne!(hashes[0], hashes[2]); assert_ne!(hashes[0], hashes[3]); assert_ne!(hashes[0], hashes[4]); assert_ne!(hashes[4], hashes[5]); assert_eq!(hashes[6], hashes[7]); }
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_multi_column_hash_for_dict_arrays() {
let strings1 = [Some("foo"), None, Some("bar")];
let strings2 = [Some("blarg"), Some("blah"), None];
let string_array: ArrayRef =
Arc::new(strings1.iter().cloned().collect::<StringArray>());
let dict_array: ArrayRef = Arc::new(
strings2
.iter()
.cloned()
.collect::<DictionaryArray<Int32Type>>(),
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut one_col_hashes = vec![0; strings1.len()];
create_hashes(
&[Arc::clone(&dict_array) as ArrayRef],
&random_state,
&mut one_col_hashes,
)
.unwrap();
let mut two_col_hashes = vec![0; strings1.len()];
create_hashes(
&[dict_array, string_array],
&random_state,
&mut two_col_hashes,
)
.unwrap();
assert_eq!(one_col_hashes.len(), 3);
assert_eq!(two_col_hashes.len(), 3);
assert_ne!(one_col_hashes, two_col_hashes);
}
#[test]
fn test_create_hashes_from_arrays() {
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array: ArrayRef =
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; int_array.len()];
let hashes =
create_hashes(&[int_array, float_array], &random_state, hashes_buff).unwrap();
assert_eq!(hashes.len(), 4,);
}
#[test]
fn test_create_hashes_from_dyn_arrays() {
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array: ArrayRef =
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
fn test(arr1: &dyn Array, arr2: &dyn Array) {
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; arr1.len()];
let hashes = create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
assert_eq!(hashes.len(), 4,);
}
test(&*int_array, &*float_array);
}
#[test]
fn test_create_hashes_equivalence() {
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes1 = vec![0; array.len()];
create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
&mut hashes1,
)
.unwrap();
let mut hashes2 = vec![0; array.len()];
create_hashes([array], &random_state, &mut hashes2).unwrap();
assert_eq!(hashes1, hashes2);
}
#[test]
fn test_with_hashes() {
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut expected_hashes = vec![0; array.len()];
create_hashes([&array], &random_state, &mut expected_hashes).unwrap();
let result = with_hashes([&array], &random_state, |hashes| {
assert_eq!(hashes.len(), 4);
assert_eq!(hashes, &expected_hashes[..]);
Ok(hashes.to_vec())
})
.unwrap();
assert_eq!(result, expected_hashes);
}
#[test]
fn test_with_hashes_multi_column() {
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut expected_hashes = vec![0; int_array.len()];
create_hashes(
[&int_array, &str_array],
&random_state,
&mut expected_hashes,
)
.unwrap();
with_hashes([&int_array, &str_array], &random_state, |hashes| {
assert_eq!(hashes.len(), 3);
assert_eq!(hashes, &expected_hashes[..]);
Ok(())
})
.unwrap();
}
#[test]
fn test_with_hashes_empty_arrays() {
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let empty: [&ArrayRef; 0] = [];
let result = with_hashes(empty, &random_state, |_hashes| Ok(()));
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("requires at least one array")
);
}
#[test]
fn test_with_hashes_reentrancy() {
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let result = with_hashes([&array], &random_state, |_hashes| {
with_hashes([&array2], &random_state, |_inner_hashes| Ok(()))
});
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("reentrantly") || err_msg.contains("cannot be called"),
"Error message should mention reentrancy: {err_msg}",
);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_sparse_union_arrays() {
let int_array = Int32Array::from(vec![Some(5), None, Some(10), Some(5)]);
let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
let type_ids = vec![0_i8, 1, 0, 0].into();
let children = vec![
Arc::new(int_array) as ArrayRef,
Arc::new(str_array) as ArrayRef,
];
let union_fields = [
(0, Arc::new(Field::new("a", DataType::Int32, true))),
(1, Arc::new(Field::new("b", DataType::Utf8, true))),
]
.into_iter()
.collect();
let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
let array_ref = Arc::new(array) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array_ref.len()];
create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[3]);
assert_ne!(hashes[0], hashes[2]);
assert_ne!(hashes[0], hashes[1]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_sparse_union_arrays_with_nulls() {
let int_array = Int32Array::from(vec![Some(5), None, None, None]);
let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
let type_ids = vec![0, 1, 0, 1].into();
let children = vec![
Arc::new(int_array) as ArrayRef,
Arc::new(str_array) as ArrayRef,
];
let union_fields = [
(0, Arc::new(Field::new("a", DataType::Int32, true))),
(1, Arc::new(Field::new("b", DataType::Utf8, true))),
]
.into_iter()
.collect();
let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
let array_ref = Arc::new(array) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array_ref.len()];
create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[2], hashes[3]);
assert_ne!(hashes[0], hashes[2]);
assert_ne!(hashes[1], hashes[3]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_dense_union_arrays() {
let int_array = Int32Array::from(vec![67, 100, 67]);
let str_array = StringArray::from(vec!["norm", "macdonald"]);
let type_ids = vec![0, 1, 0, 1, 0].into();
let offsets = vec![0, 0, 1, 1, 2].into();
let children = vec![
Arc::new(int_array) as ArrayRef,
Arc::new(str_array) as ArrayRef,
];
let union_fields = [
(0, Arc::new(Field::new("a", DataType::Int32, false))),
(1, Arc::new(Field::new("b", DataType::Utf8, false))),
]
.into_iter()
.collect();
let array =
UnionArray::try_new(union_fields, type_ids, Some(offsets), children).unwrap();
let array_ref = Arc::new(array) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array_ref.len()];
create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
assert_ne!(hashes[0], hashes[1]);
assert_ne!(hashes[0], hashes[2]);
assert_ne!(hashes[1], hashes[3]);
assert_ne!(hashes[2], hashes[3]);
assert_eq!(hashes[0], hashes[4]);
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_sliced_run_array() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut full_hashes = vec![0; array.len()];
create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
&mut full_hashes,
)?;
let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
let sliced_array = array_ref.slice(2, 3);
let mut sliced_hashes = vec![0; sliced_array.len()];
create_hashes(
std::slice::from_ref(&sliced_array),
&random_state,
&mut sliced_hashes,
)?;
assert_eq!(sliced_hashes.len(), 3);
assert_eq!(sliced_hashes[0], sliced_hashes[1]);
assert_eq!(sliced_hashes[1], sliced_hashes[2]);
assert_eq!(&sliced_hashes, &full_hashes[2..5]);
Ok(())
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn test_run_array_with_nulls() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array.len()];
create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
&mut hashes,
)?;
assert_eq!(hashes[0], hashes[1]);
assert_ne!(hashes[0], 0);
assert_eq!(hashes[2], hashes[3]);
assert_eq!(hashes[2], 0);
assert_eq!(hashes[4], hashes[5]);
assert_ne!(hashes[4], 0);
assert_ne!(hashes[0], hashes[4]);
Ok(())
}
#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn test_run_array_with_nulls_multicolumn() -> Result<()> {
let primitive_array = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
let run_values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
let run_ends = Arc::new(Int32Array::from(vec![1, 2, 3]));
let run_array =
Arc::new(RunArray::try_new(&run_ends, run_values.as_ref()).unwrap());
let second_col = Arc::new(Int32Array::from(vec![100, 200, 300]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut primitive_hashes = vec![0; 3];
create_hashes(
&[
Arc::clone(&primitive_array) as ArrayRef,
Arc::clone(&second_col) as ArrayRef,
],
&random_state,
&mut primitive_hashes,
)?;
let mut run_hashes = vec![0; 3];
create_hashes(
&[
Arc::clone(&run_array) as ArrayRef,
Arc::clone(&second_col) as ArrayRef,
],
&random_state,
&mut run_hashes,
)?;
assert_eq!(primitive_hashes, run_hashes);
Ok(())
}
}