datafusion_comet_spark_expr/hash_funcs/
murmur3.rs1use crate::create_hashes_internal;
19use arrow::array::types::ArrowDictionaryKeyType;
20use arrow::array::{Array, ArrayRef, ArrowNativeTypeOp, DictionaryArray, Int32Array};
21use arrow::compute::take;
22use arrow::datatypes::ArrowNativeType;
23use datafusion::common::{internal_err, DataFusionError, ScalarValue};
24use datafusion::physical_plan::ColumnarValue;
25use std::sync::Arc;
26
27pub fn spark_murmur3_hash(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
29 let length = args.len();
30 let seed = &args[length - 1];
31 match seed {
32 ColumnarValue::Scalar(ScalarValue::Int32(Some(seed))) => {
33 let num_rows = args[0..args.len() - 1]
35 .iter()
36 .find_map(|arg| match arg {
37 ColumnarValue::Array(array) => Some(array.len()),
38 ColumnarValue::Scalar(_) => None,
39 })
40 .unwrap_or(1);
41 let mut hashes: Vec<u32> = vec![0_u32; num_rows];
42 hashes.fill(*seed as u32);
43 let arrays = args[0..args.len() - 1]
44 .iter()
45 .map(|arg| match arg {
46 ColumnarValue::Array(array) => Arc::clone(array),
47 ColumnarValue::Scalar(scalar) => {
48 scalar.clone().to_array_of_size(num_rows).unwrap()
49 }
50 })
51 .collect::<Vec<ArrayRef>>();
52 create_murmur3_hashes(&arrays, &mut hashes)?;
53 if num_rows == 1 {
54 Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(
55 hashes[0] as i32,
56 ))))
57 } else {
58 let hashes: Vec<i32> = hashes.into_iter().map(|x| x as i32).collect();
59 Ok(ColumnarValue::Array(Arc::new(Int32Array::from(hashes))))
60 }
61 }
62 _ => {
63 internal_err!(
64 "The seed of function murmur3_hash must be an Int32 scalar value, but got: {:?}.",
65 seed
66 )
67 }
68 }
69}
70
71#[inline]
73pub fn spark_compatible_murmur3_hash<T: AsRef<[u8]>>(data: T, seed: u32) -> u32 {
74 #[inline]
75 fn mix_k1(mut k1: i32) -> i32 {
76 k1 = k1.mul_wrapping(0xcc9e2d51u32 as i32);
77 k1 = k1.rotate_left(15);
78 k1 = k1.mul_wrapping(0x1b873593u32 as i32);
79 k1
80 }
81
82 #[inline]
83 fn mix_h1(mut h1: i32, k1: i32) -> i32 {
84 h1 ^= k1;
85 h1 = h1.rotate_left(13);
86 h1 = h1.mul_wrapping(5).add_wrapping(0xe6546b64u32 as i32);
87 h1
88 }
89
90 #[inline]
91 fn fmix(mut h1: i32, len: i32) -> i32 {
92 h1 ^= len;
93 h1 ^= (h1 as u32 >> 16) as i32;
94 h1 = h1.mul_wrapping(0x85ebca6bu32 as i32);
95 h1 ^= (h1 as u32 >> 13) as i32;
96 h1 = h1.mul_wrapping(0xc2b2ae35u32 as i32);
97 h1 ^= (h1 as u32 >> 16) as i32;
98 h1
99 }
100
101 #[inline]
102 unsafe fn hash_bytes_by_int(data: &[u8], seed: u32) -> i32 {
103 let mut h1 = seed as i32;
105 for i in (0..data.len()).step_by(4) {
106 let ints = data.as_ptr().add(i) as *const i32;
107 let mut half_word = ints.read_unaligned();
108 if cfg!(target_endian = "big") {
109 half_word = half_word.reverse_bits();
110 }
111 h1 = mix_h1(h1, mix_k1(half_word));
112 }
113 h1
114 }
115 let data = data.as_ref();
116 let len = data.len();
117 let len_aligned = len - len % 4;
118
119 unsafe {
124 let mut h1 = if len_aligned > 0 {
125 hash_bytes_by_int(&data[0..len_aligned], seed)
126 } else {
127 seed as i32
128 };
129
130 for i in len_aligned..len {
131 let half_word = *data.get_unchecked(i) as i8 as i32;
132 h1 = mix_h1(h1, mix_k1(half_word));
133 }
134 fmix(h1, len as i32) as u32
135 }
136}
137
138fn create_hashes_dictionary<K: ArrowDictionaryKeyType>(
140 array: &ArrayRef,
141 hashes_buffer: &mut [u32],
142 first_col: bool,
143) -> datafusion::common::Result<()> {
144 let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
145 if !first_col {
146 let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?;
148 create_murmur3_hashes(&[unpacked], hashes_buffer)?;
149 } else {
150 let dict_values = Arc::clone(dict_array.values());
154 let mut dict_hashes = vec![42; dict_values.len()];
156 create_murmur3_hashes(&[dict_values], &mut dict_hashes)?;
157 for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
158 if let Some(key) = key {
159 let idx = key.to_usize().ok_or_else(|| {
160 DataFusionError::Internal(format!(
161 "Can not convert key value {:?} to usize in dictionary of type {:?}",
162 key,
163 dict_array.data_type()
164 ))
165 })?;
166 *hash = dict_hashes[idx]
167 } }
169 }
170 Ok(())
171}
172
173pub fn create_murmur3_hashes<'a>(
179 arrays: &[ArrayRef],
180 hashes_buffer: &'a mut [u32],
181) -> datafusion::common::Result<&'a mut [u32]> {
182 create_hashes_internal!(
183 arrays,
184 hashes_buffer,
185 spark_compatible_murmur3_hash,
186 create_hashes_dictionary
187 );
188 Ok(hashes_buffer)
189}
190
191#[cfg(test)]
192mod tests {
193 use arrow::array::{Float32Array, Float64Array};
194 use std::sync::Arc;
195
196 use crate::murmur3::create_murmur3_hashes;
197 use crate::test_hashes_with_nulls;
198 use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, Int8Array, StringArray};
199
200 fn test_murmur3_hash<I: Clone, T: arrow::array::Array + From<Vec<Option<I>>> + 'static>(
201 values: Vec<Option<I>>,
202 expected: Vec<u32>,
203 ) {
204 test_hashes_with_nulls!(create_murmur3_hashes, T, values, expected, u32);
205 }
206
207 #[test]
208 fn test_i8() {
209 test_murmur3_hash::<i8, Int8Array>(
210 vec![Some(1), Some(0), Some(-1), Some(i8::MAX), Some(i8::MIN)],
211 vec![0xdea578e3, 0x379fae8f, 0xa0590e3d, 0x43b4d8ed, 0x422a1365],
212 );
213 }
214
215 #[test]
216 fn test_i32() {
217 test_murmur3_hash::<i32, Int32Array>(
218 vec![Some(1), Some(0), Some(-1), Some(i32::MAX), Some(i32::MIN)],
219 vec![0xdea578e3, 0x379fae8f, 0xa0590e3d, 0x07fb67e7, 0x2b1f0fc6],
220 );
221 }
222
223 #[test]
224 fn test_i64() {
225 test_murmur3_hash::<i64, Int64Array>(
226 vec![Some(1), Some(0), Some(-1), Some(i64::MAX), Some(i64::MIN)],
227 vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb],
228 );
229 }
230
231 #[test]
232 fn test_f32() {
233 test_murmur3_hash::<f32, Float32Array>(
234 vec![
235 Some(1.0),
236 Some(0.0),
237 Some(-0.0),
238 Some(-1.0),
239 Some(99999999999.99999999999),
240 Some(-99999999999.99999999999),
241 ],
242 vec![
243 0xe434cc39, 0x379fae8f, 0x379fae8f, 0xdc0da8eb, 0xcbdc340f, 0xc0361c86,
244 ],
245 );
246 }
247
248 #[test]
249 fn test_f64() {
250 test_murmur3_hash::<f64, Float64Array>(
251 vec![
252 Some(1.0),
253 Some(0.0),
254 Some(-0.0),
255 Some(-1.0),
256 Some(99999999999.99999999999),
257 Some(-99999999999.99999999999),
258 ],
259 vec![
260 0xe4876492, 0x9c67b85d, 0x9c67b85d, 0x13d81357, 0xb87e1595, 0xa0eef9f9,
261 ],
262 );
263 }
264
265 #[test]
266 fn test_str() {
267 let input = [
268 "hello", "bar", "", "😁", "天地", "a", "ab", "abc", "abcd", "abcde",
269 ]
270 .iter()
271 .map(|s| Some(s.to_string()))
272 .collect::<Vec<Option<String>>>();
273 let expected: Vec<u32> = vec![
274 3286402344, 2486176763, 142593372, 885025535, 2395000894, 1485273170, 0xfa37157b,
275 1322437556, 0xe860e5cc, 814637928,
276 ];
277
278 test_murmur3_hash::<String, StringArray>(input.clone(), expected);
279 }
280}