dbx_core/sql/executor/
hash_utils.rs1use crate::error::DbxResult;
4use ahash::RandomState;
5use arrow::array::*;
6use arrow::datatypes::DataType;
7use arrow::record_batch::RecordBatch;
8
9pub fn hash_batch(batch: &RecordBatch, columns: &[usize], seed: u64) -> DbxResult<UInt64Array> {
12 let num_rows = batch.num_rows();
13 if num_rows == 0 {
14 return Ok(UInt64Array::from(Vec::<u64>::new()));
15 }
16
17 let hasher_state = RandomState::with_seeds(seed, seed, seed, seed);
18 let mut hashes = vec![seed; num_rows];
19
20 for &col_idx in columns {
21 let col = batch.column(col_idx);
22 update_hashes(col, &mut hashes, &hasher_state)?;
23 }
24
25 Ok(UInt64Array::from(hashes))
26}
27
28fn update_hashes(col: &ArrayRef, hashes: &mut [u64], hasher: &RandomState) -> DbxResult<()> {
30 let num_rows = col.len();
31
32 match col.data_type() {
33 DataType::Int32 => {
34 let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
35 for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
36 if !arr.is_null(i) {
37 let val_hash = hasher.hash_one(arr.value(i));
38 *hash = combine_hashes(*hash, val_hash);
39 } else {
40 *hash = combine_hashes(*hash, 0);
41 }
42 }
43 }
44 DataType::Int64 => {
45 let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
46 for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
47 if !arr.is_null(i) {
48 let val_hash = hasher.hash_one(arr.value(i));
49 *hash = combine_hashes(*hash, val_hash);
50 } else {
51 *hash = combine_hashes(*hash, 0);
52 }
53 }
54 }
55 DataType::Float64 => {
56 let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
57 for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
58 if !arr.is_null(i) {
59 let val_hash = hasher.hash_one(arr.value(i).to_bits());
60 *hash = combine_hashes(*hash, val_hash);
61 } else {
62 *hash = combine_hashes(*hash, 0);
63 }
64 }
65 }
66 DataType::Utf8 => {
67 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
68 for (i, hash) in hashes.iter_mut().enumerate().take(num_rows) {
69 if !arr.is_null(i) {
70 let val_hash = hasher.hash_one(arr.value(i));
71 *hash = combine_hashes(*hash, val_hash);
72 } else {
73 *hash = combine_hashes(*hash, 0);
74 }
75 }
76 }
77 _ => {
78 for hash in hashes.iter_mut().take(num_rows) {
80 let val_hash = hasher.hash_one(format!("{:?}", col.as_any()));
81 *hash = combine_hashes(*hash, val_hash);
82 }
83 }
84 }
85 Ok(())
86}
87
88#[inline]
90fn combine_hashes(h1: u64, h2: u64) -> u64 {
91 h1 ^ (h2
93 .wrapping_add(0x9e3779b97f4a7c15)
94 .wrapping_add(h1 << 6)
95 .wrapping_add(h1 >> 2))
96}