use crate::error::DbxResult;
use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
pub fn hash_batch(batch: &RecordBatch, columns: &[usize], seed: u64) -> DbxResult<UInt64Array> {
let num_rows = batch.num_rows();
if num_rows == 0 {
return Ok(UInt64Array::from(Vec::<u64>::new()));
}
let hasher_state = RandomState::with_seeds(seed, seed, seed, seed);
let mut hashes = vec![seed; num_rows];
for &col_idx in columns {
let col = batch.column(col_idx);
update_hashes(col, &mut hashes, &hasher_state)?;
}
Ok(UInt64Array::from(hashes))
}
fn update_hashes(col: &ArrayRef, hashes: &mut [u64], hasher: &RandomState) -> DbxResult<()> {
let num_rows = col.len();
match col.data_type() {
DataType::Int32 => {
let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
if !arr.is_null(i) {
let val_hash = hasher.hash_one(arr.value(i));
*hash = combine_hashes(*hash, val_hash);
} else {
*hash = combine_hashes(*hash, 0);
}
}
}
DataType::Int64 => {
let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
if !arr.is_null(i) {
let val_hash = hasher.hash_one(arr.value(i));
*hash = combine_hashes(*hash, val_hash);
} else {
*hash = combine_hashes(*hash, 0);
}
}
}
DataType::Float64 => {
let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
if !arr.is_null(i) {
let val_hash = hasher.hash_one(arr.value(i).to_bits());
*hash = combine_hashes(*hash, val_hash);
} else {
*hash = combine_hashes(*hash, 0);
}
}
}
DataType::Utf8 => {
let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
if !arr.is_null(i) {
let val_hash = hasher.hash_one(arr.value(i));
*hash = combine_hashes(*hash, val_hash);
} else {
*hash = combine_hashes(*hash, 0);
}
}
}
_ => {
for hash in hashes.iter_mut().take(num_rows) {
let val_hash = hasher.hash_one(format!("{:?}", col.as_any()));
*hash = combine_hashes(*hash, val_hash);
}
}
}
Ok(())
}
#[inline]
fn combine_hashes(h1: u64, h2: u64) -> u64 {
h1 ^ (h2
.wrapping_add(0x9e3779b97f4a7c15)
.wrapping_add(h1 << 6)
.wrapping_add(h1 >> 2))
}