datafusion_comet_spark_expr/hash_funcs/
murmur3.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::create_hashes_internal;
19use arrow::compute::take;
20use arrow_array::types::ArrowDictionaryKeyType;
21use arrow_array::{Array, ArrayRef, ArrowNativeTypeOp, DictionaryArray, Int32Array};
22use arrow_buffer::ArrowNativeType;
23use datafusion_common::{internal_err, DataFusionError, ScalarValue};
24use datafusion_expr::ColumnarValue;
25use std::sync::Arc;
26
27/// Spark compatible murmur3 hash (just `hash` in Spark) in vectorized execution fashion
28pub fn spark_murmur3_hash(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
29    let length = args.len();
30    let seed = &args[length - 1];
31    match seed {
32        ColumnarValue::Scalar(ScalarValue::Int32(Some(seed))) => {
33            // iterate over the arguments to find out the length of the array
34            let num_rows = args[0..args.len() - 1]
35                .iter()
36                .find_map(|arg| match arg {
37                    ColumnarValue::Array(array) => Some(array.len()),
38                    ColumnarValue::Scalar(_) => None,
39                })
40                .unwrap_or(1);
41            let mut hashes: Vec<u32> = vec![0_u32; num_rows];
42            hashes.fill(*seed as u32);
43            let arrays = args[0..args.len() - 1]
44                .iter()
45                .map(|arg| match arg {
46                    ColumnarValue::Array(array) => Arc::clone(array),
47                    ColumnarValue::Scalar(scalar) => {
48                        scalar.clone().to_array_of_size(num_rows).unwrap()
49                    }
50                })
51                .collect::<Vec<ArrayRef>>();
52            create_murmur3_hashes(&arrays, &mut hashes)?;
53            if num_rows == 1 {
54                Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(
55                    hashes[0] as i32,
56                ))))
57            } else {
58                let hashes: Vec<i32> = hashes.into_iter().map(|x| x as i32).collect();
59                Ok(ColumnarValue::Array(Arc::new(Int32Array::from(hashes))))
60            }
61        }
62        _ => {
63            internal_err!(
64                "The seed of function murmur3_hash must be an Int32 scalar value, but got: {:?}.",
65                seed
66            )
67        }
68    }
69}
70
71/// Spark-compatible murmur3 hash function
72#[inline]
73pub fn spark_compatible_murmur3_hash<T: AsRef<[u8]>>(data: T, seed: u32) -> u32 {
74    #[inline]
75    fn mix_k1(mut k1: i32) -> i32 {
76        k1 = k1.mul_wrapping(0xcc9e2d51u32 as i32);
77        k1 = k1.rotate_left(15);
78        k1 = k1.mul_wrapping(0x1b873593u32 as i32);
79        k1
80    }
81
82    #[inline]
83    fn mix_h1(mut h1: i32, k1: i32) -> i32 {
84        h1 ^= k1;
85        h1 = h1.rotate_left(13);
86        h1 = h1.mul_wrapping(5).add_wrapping(0xe6546b64u32 as i32);
87        h1
88    }
89
90    #[inline]
91    fn fmix(mut h1: i32, len: i32) -> i32 {
92        h1 ^= len;
93        h1 ^= (h1 as u32 >> 16) as i32;
94        h1 = h1.mul_wrapping(0x85ebca6bu32 as i32);
95        h1 ^= (h1 as u32 >> 13) as i32;
96        h1 = h1.mul_wrapping(0xc2b2ae35u32 as i32);
97        h1 ^= (h1 as u32 >> 16) as i32;
98        h1
99    }
100
101    #[inline]
102    unsafe fn hash_bytes_by_int(data: &[u8], seed: u32) -> i32 {
103        // safety: data length must be aligned to 4 bytes
104        let mut h1 = seed as i32;
105        for i in (0..data.len()).step_by(4) {
106            let ints = data.as_ptr().add(i) as *const i32;
107            let mut half_word = ints.read_unaligned();
108            if cfg!(target_endian = "big") {
109                half_word = half_word.reverse_bits();
110            }
111            h1 = mix_h1(h1, mix_k1(half_word));
112        }
113        h1
114    }
115    let data = data.as_ref();
116    let len = data.len();
117    let len_aligned = len - len % 4;
118
119    // safety:
120    // avoid boundary checking in performance critical codes.
121    // all operations are guaranteed to be safe
122    // data is &[u8] so we do not need to check for proper alignment
123    unsafe {
124        let mut h1 = if len_aligned > 0 {
125            hash_bytes_by_int(&data[0..len_aligned], seed)
126        } else {
127            seed as i32
128        };
129
130        for i in len_aligned..len {
131            let half_word = *data.get_unchecked(i) as i8 as i32;
132            h1 = mix_h1(h1, mix_k1(half_word));
133        }
134        fmix(h1, len as i32) as u32
135    }
136}
137
138/// Hash the values in a dictionary array
139fn create_hashes_dictionary<K: ArrowDictionaryKeyType>(
140    array: &ArrayRef,
141    hashes_buffer: &mut [u32],
142    first_col: bool,
143) -> datafusion_common::Result<()> {
144    let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
145    if !first_col {
146        // unpack the dictionary array as each row may have a different hash input
147        let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?;
148        create_murmur3_hashes(&[unpacked], hashes_buffer)?;
149    } else {
150        // For the first column, hash each dictionary value once, and then use
151        // that computed hash for each key value to avoid a potentially
152        // expensive redundant hashing for large dictionary elements (e.g. strings)
153        let dict_values = Arc::clone(dict_array.values());
154        // same initial seed as Spark
155        let mut dict_hashes = vec![42; dict_values.len()];
156        create_murmur3_hashes(&[dict_values], &mut dict_hashes)?;
157        for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
158            if let Some(key) = key {
159                let idx = key.to_usize().ok_or_else(|| {
160                    DataFusionError::Internal(format!(
161                        "Can not convert key value {:?} to usize in dictionary of type {:?}",
162                        key,
163                        dict_array.data_type()
164                    ))
165                })?;
166                *hash = dict_hashes[idx]
167            } // no update for Null, consistent with other hashes
168        }
169    }
170    Ok(())
171}
172
173/// Creates hash values for every row, based on the values in the
174/// columns.
175///
176/// The number of rows to hash is determined by `hashes_buffer.len()`.
177/// `hashes_buffer` should be pre-sized appropriately
178pub fn create_murmur3_hashes<'a>(
179    arrays: &[ArrayRef],
180    hashes_buffer: &'a mut [u32],
181) -> datafusion_common::Result<&'a mut [u32]> {
182    create_hashes_internal!(
183        arrays,
184        hashes_buffer,
185        spark_compatible_murmur3_hash,
186        create_hashes_dictionary
187    );
188    Ok(hashes_buffer)
189}
190
191#[cfg(test)]
192mod tests {
193    use arrow::array::{Float32Array, Float64Array};
194    use std::sync::Arc;
195
196    use crate::murmur3::create_murmur3_hashes;
197    use crate::test_hashes_with_nulls;
198    use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, Int8Array, StringArray};
199
200    fn test_murmur3_hash<I: Clone, T: arrow_array::Array + From<Vec<Option<I>>> + 'static>(
201        values: Vec<Option<I>>,
202        expected: Vec<u32>,
203    ) {
204        test_hashes_with_nulls!(create_murmur3_hashes, T, values, expected, u32);
205    }
206
207    #[test]
208    fn test_i8() {
209        test_murmur3_hash::<i8, Int8Array>(
210            vec![Some(1), Some(0), Some(-1), Some(i8::MAX), Some(i8::MIN)],
211            vec![0xdea578e3, 0x379fae8f, 0xa0590e3d, 0x43b4d8ed, 0x422a1365],
212        );
213    }
214
215    #[test]
216    fn test_i32() {
217        test_murmur3_hash::<i32, Int32Array>(
218            vec![Some(1), Some(0), Some(-1), Some(i32::MAX), Some(i32::MIN)],
219            vec![0xdea578e3, 0x379fae8f, 0xa0590e3d, 0x07fb67e7, 0x2b1f0fc6],
220        );
221    }
222
223    #[test]
224    fn test_i64() {
225        test_murmur3_hash::<i64, Int64Array>(
226            vec![Some(1), Some(0), Some(-1), Some(i64::MAX), Some(i64::MIN)],
227            vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb],
228        );
229    }
230
231    #[test]
232    fn test_f32() {
233        test_murmur3_hash::<f32, Float32Array>(
234            vec![
235                Some(1.0),
236                Some(0.0),
237                Some(-0.0),
238                Some(-1.0),
239                Some(99999999999.99999999999),
240                Some(-99999999999.99999999999),
241            ],
242            vec![
243                0xe434cc39, 0x379fae8f, 0x379fae8f, 0xdc0da8eb, 0xcbdc340f, 0xc0361c86,
244            ],
245        );
246    }
247
248    #[test]
249    fn test_f64() {
250        test_murmur3_hash::<f64, Float64Array>(
251            vec![
252                Some(1.0),
253                Some(0.0),
254                Some(-0.0),
255                Some(-1.0),
256                Some(99999999999.99999999999),
257                Some(-99999999999.99999999999),
258            ],
259            vec![
260                0xe4876492, 0x9c67b85d, 0x9c67b85d, 0x13d81357, 0xb87e1595, 0xa0eef9f9,
261            ],
262        );
263    }
264
265    #[test]
266    fn test_str() {
267        let input = [
268            "hello", "bar", "", "😁", "天地", "a", "ab", "abc", "abcd", "abcde",
269        ]
270        .iter()
271        .map(|s| Some(s.to_string()))
272        .collect::<Vec<Option<String>>>();
273        let expected: Vec<u32> = vec![
274            3286402344, 2486176763, 142593372, 885025535, 2395000894, 1485273170, 0xfa37157b,
275            1322437556, 0xe860e5cc, 814637928,
276        ];
277
278        test_murmur3_hash::<String, StringArray>(input.clone(), expected);
279    }
280}