Skip to main content

datafusion_spark/function/hash/
xxhash64.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{
21    Array, ArrayRef, DictionaryArray, Int64Array, types::ArrowDictionaryKeyType,
22};
23use arrow::buffer::{Buffer, ScalarBuffer};
24use arrow::compute::take;
25use arrow::datatypes::{ArrowNativeType, DataType, Field, FieldRef};
26use datafusion_common::{Result, ScalarValue, internal_err};
27use datafusion_expr::{
28    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
29    Volatility,
30};
31use twox_hash::XxHash64;
32
33use crate::create_hashes_internal;
34
35const DEFAULT_SEED: u64 = 42;
36
37/// Spark-compatible xxhash64 function.
38/// <https://spark.apache.org/docs/latest/api/sql/index.html#xxhash64>
39#[derive(Debug, PartialEq, Eq, Hash)]
40pub struct SparkXxhash64 {
41    signature: Signature,
42}
43
44impl Default for SparkXxhash64 {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl SparkXxhash64 {
51    pub fn new() -> Self {
52        Self {
53            signature: Signature::variadic_any(Volatility::Immutable),
54        }
55    }
56}
57
58impl ScalarUDFImpl for SparkXxhash64 {
59    fn name(&self) -> &str {
60        "xxhash64"
61    }
62
63    fn signature(&self) -> &Signature {
64        &self.signature
65    }
66
67    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
68        internal_err!("return_field_from_args should be used instead")
69    }
70
71    fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
72        // Spark's HashExpression overrides nullable to false: NULL inputs are
73        // skipped and the seed is used, so the result is never null.
74        Ok(Arc::new(Field::new(self.name(), DataType::Int64, false)))
75    }
76
77    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
78        let num_rows = args.number_rows;
79        let mut hashes: Vec<u64> = vec![DEFAULT_SEED; num_rows];
80
81        let arrays = ColumnarValue::values_to_arrays(&args.args)?;
82        create_xxhash64_hashes(&arrays, &mut hashes)?;
83
84        if num_rows == 1 {
85            Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(
86                hashes[0] as i64,
87            ))))
88        } else {
89            // Reinterpret Vec<u64> as ScalarBuffer<i64> without copying — both
90            // types have identical layout, and `as i64` is a bitcast.
91            let buffer = ScalarBuffer::<i64>::from(Buffer::from_vec(hashes));
92            Ok(ColumnarValue::Array(Arc::new(Int64Array::new(
93                buffer, None,
94            ))))
95        }
96    }
97}
98
99#[inline]
100fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 {
101    XxHash64::oneshot(seed, data.as_ref())
102}
103
104/// Hash the values in a dictionary array using xxhash64.
105fn create_xxhash64_hashes_dictionary<K: ArrowDictionaryKeyType>(
106    array: &ArrayRef,
107    hashes_buffer: &mut [u64],
108    first_col: bool,
109) -> Result<()> {
110    let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
111    if !first_col {
112        let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?;
113        create_xxhash64_hashes(&[unpacked], hashes_buffer)?;
114    } else {
115        // Hash each dictionary value once, then look up by key. This avoids
116        // redundant hashing of large dictionary entries (e.g. long strings).
117        let dict_values = Arc::clone(dict_array.values());
118        let mut dict_hashes = vec![DEFAULT_SEED; dict_values.len()];
119        create_xxhash64_hashes(&[dict_values], &mut dict_hashes)?;
120
121        for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
122            if let Some(key) = key {
123                *hash = dict_hashes[key.as_usize()]
124            }
125            // No update for Null keys, consistent with other types.
126        }
127    }
128    Ok(())
129}
130
131/// Create xxhash64 hash values for every row, based on the values in the columns.
132///
133/// The number of rows to hash is determined by `hashes_buffer.len()`.
134/// `hashes_buffer` should be pre-sized appropriately and seeded with the
135/// initial hash value (Spark uses `42`).
136fn create_xxhash64_hashes(arrays: &[ArrayRef], hashes_buffer: &mut [u64]) -> Result<()> {
137    create_hashes_internal!(
138        arrays,
139        hashes_buffer,
140        spark_compatible_xxhash64,
141        create_xxhash64_hashes_dictionary,
142        create_xxhash64_hashes
143    );
144    Ok(())
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use arrow::array::{FixedSizeBinaryArray, Int32Array, StringArray};
151
152    #[test]
153    fn test_xxhash64_nullability() -> Result<()> {
154        let func = SparkXxhash64::new();
155
156        // Spark's xxhash64 is never null (NULL args are skipped, seed is returned),
157        // so the output field is non-nullable regardless of input nullability.
158        let nullable: FieldRef = Arc::new(Field::new("a", DataType::Int32, true));
159        let non_nullable: FieldRef = Arc::new(Field::new("b", DataType::Int32, false));
160
161        let out = func.return_field_from_args(ReturnFieldArgs {
162            arg_fields: &[Arc::clone(&nullable), Arc::clone(&non_nullable)],
163            scalar_arguments: &[None, None],
164        })?;
165        assert!(!out.is_nullable());
166        assert_eq!(out.data_type(), &DataType::Int64);
167
168        Ok(())
169    }
170
171    #[test]
172    fn test_xxhash64_i32() {
173        let seed = 42u64;
174        assert_eq!(
175            spark_compatible_xxhash64(1i32.to_le_bytes(), seed),
176            0xa309b38455455929
177        );
178        assert_eq!(
179            spark_compatible_xxhash64(0i32.to_le_bytes(), seed),
180            0x3229fbc4681e48f3
181        );
182        assert_eq!(
183            spark_compatible_xxhash64((-1i32).to_le_bytes(), seed),
184            0x1bfdda8861c06e45
185        );
186    }
187
188    #[test]
189    fn test_xxhash64_i32_boundary() {
190        let seed = 42u64;
191        let h = spark_compatible_xxhash64(i32::MAX.to_le_bytes(), seed);
192        assert_ne!(h, seed);
193        let h = spark_compatible_xxhash64(i32::MIN.to_le_bytes(), seed);
194        assert_ne!(h, seed);
195    }
196
197    #[test]
198    fn test_xxhash64_i8() {
199        let seed = 42u64;
200        // i8 is widened to i32 before hashing
201        assert_eq!(
202            spark_compatible_xxhash64((1i8 as i32).to_le_bytes(), seed),
203            spark_compatible_xxhash64(1i32.to_le_bytes(), seed),
204        );
205    }
206
207    #[test]
208    fn test_xxhash64_i64() {
209        let seed = 42u64;
210        assert_eq!(
211            spark_compatible_xxhash64(1i64.to_le_bytes(), seed),
212            0x9ed50fd59358d232
213        );
214        assert_eq!(
215            spark_compatible_xxhash64(0i64.to_le_bytes(), seed),
216            0xb71b47ebda15746c
217        );
218        assert_eq!(
219            spark_compatible_xxhash64((-1i64).to_le_bytes(), seed),
220            0x358ae035bfb46fd2
221        );
222    }
223
224    #[test]
225    fn test_xxhash64_i64_boundary() {
226        let seed = 42u64;
227        let h = spark_compatible_xxhash64(i64::MAX.to_le_bytes(), seed);
228        assert_ne!(h, seed);
229        let h = spark_compatible_xxhash64(i64::MIN.to_le_bytes(), seed);
230        assert_ne!(h, seed);
231    }
232
233    /// Spark normalizes `-0.0` to `0.0` before hashing, so both produce
234    /// the same hash. Exercise the dispatch through `create_xxhash64_hashes`
235    /// to cover the `hash_array_primitive_float!` normalization path.
236    #[test]
237    fn test_xxhash64_negative_zero_f32() {
238        use arrow::array::Float32Array;
239        let array: ArrayRef = Arc::new(Float32Array::from(vec![0.0f32, -0.0f32]));
240        let mut hashes = vec![DEFAULT_SEED; 2];
241        create_xxhash64_hashes(&[array], &mut hashes).unwrap();
242        assert_eq!(hashes[0], hashes[1]);
243        assert_eq!(hashes[0], spark_compatible_xxhash64(0i32.to_le_bytes(), 42));
244    }
245
246    #[test]
247    fn test_xxhash64_negative_zero_f64() {
248        use arrow::array::Float64Array;
249        let array: ArrayRef = Arc::new(Float64Array::from(vec![0.0f64, -0.0f64]));
250        let mut hashes = vec![DEFAULT_SEED; 2];
251        create_xxhash64_hashes(&[array], &mut hashes).unwrap();
252        assert_eq!(hashes[0], hashes[1]);
253        assert_eq!(hashes[0], spark_compatible_xxhash64(0i64.to_le_bytes(), 42));
254    }
255
256    #[test]
257    fn test_xxhash64_string() {
258        let seed = 42u64;
259        assert_eq!(spark_compatible_xxhash64("hello", seed), 0xc3629e6318d53932);
260        assert_eq!(spark_compatible_xxhash64("", seed), 0x98b1582b0977e704);
261        assert_eq!(spark_compatible_xxhash64("abc", seed), 0x13c1d910702770e6);
262    }
263
264    #[test]
265    fn test_xxhash64_string_emoji_cjk() {
266        let seed = 42u64;
267        let h1 = spark_compatible_xxhash64("😁", seed);
268        assert_ne!(h1, seed);
269        let h2 = spark_compatible_xxhash64("天地", seed);
270        assert_ne!(h2, seed);
271        assert_ne!(h1, h2);
272    }
273
274    #[test]
275    fn test_xxhash64_dictionary_string() {
276        use arrow::array::DictionaryArray;
277        use arrow::datatypes::Int32Type;
278
279        let dict_array: DictionaryArray<Int32Type> =
280            vec!["hello", "world", "abc", "hello", "world"]
281                .into_iter()
282                .collect();
283        let array_ref: ArrayRef = Arc::new(dict_array);
284
285        let mut hashes = vec![DEFAULT_SEED; 5];
286        create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
287
288        assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 42));
289        assert_eq!(hashes[1], spark_compatible_xxhash64("world", 42));
290        assert_eq!(hashes[2], spark_compatible_xxhash64("abc", 42));
291        assert_eq!(hashes[3], hashes[0]);
292        assert_eq!(hashes[4], hashes[1]);
293    }
294
295    #[test]
296    fn test_xxhash64_dictionary_int() {
297        use arrow::array::DictionaryArray;
298        use arrow::datatypes::Int32Type;
299
300        let keys = Int32Array::from(vec![0, 1, 2, 0, 1]);
301        let values = Int32Array::from(vec![100, 200, 300]);
302        let dict_array =
303            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
304        let array_ref: ArrayRef = Arc::new(dict_array);
305
306        let mut hashes = vec![DEFAULT_SEED; 5];
307        create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
308
309        assert_eq!(
310            hashes[0],
311            spark_compatible_xxhash64(100i32.to_le_bytes(), 42)
312        );
313        assert_eq!(
314            hashes[1],
315            spark_compatible_xxhash64(200i32.to_le_bytes(), 42)
316        );
317        assert_eq!(
318            hashes[2],
319            spark_compatible_xxhash64(300i32.to_le_bytes(), 42)
320        );
321        assert_eq!(hashes[3], hashes[0]);
322        assert_eq!(hashes[4], hashes[1]);
323    }
324
325    #[test]
326    fn test_xxhash64_dictionary_with_nulls() {
327        use arrow::array::DictionaryArray;
328        use arrow::datatypes::Int32Type;
329
330        let keys = Int32Array::from(vec![Some(0), None, Some(1), Some(0), None]);
331        let values = StringArray::from(vec!["hello", "world"]);
332        let dict_array =
333            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
334        let array_ref: ArrayRef = Arc::new(dict_array);
335
336        let mut hashes = vec![DEFAULT_SEED; 5];
337        create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
338
339        assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 42));
340        assert_eq!(hashes[2], spark_compatible_xxhash64("world", 42));
341        assert_eq!(hashes[3], spark_compatible_xxhash64("hello", 42));
342        assert_eq!(hashes[1], DEFAULT_SEED);
343        assert_eq!(hashes[4], DEFAULT_SEED);
344    }
345
346    #[test]
347    fn test_xxhash64_dictionary_non_first_column() {
348        use arrow::array::DictionaryArray;
349        use arrow::datatypes::Int32Type;
350
351        let dict_array: DictionaryArray<Int32Type> =
352            vec!["hello", "world", "abc"].into_iter().collect();
353        let array_ref: ArrayRef = Arc::new(dict_array);
354
355        let mut hashes = vec![123u64, 456u64, 789u64];
356        create_xxhash64_hashes_dictionary::<Int32Type>(&array_ref, &mut hashes, false)
357            .unwrap();
358
359        assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 123));
360        assert_eq!(hashes[1], spark_compatible_xxhash64("world", 456));
361        assert_eq!(hashes[2], spark_compatible_xxhash64("abc", 789));
362    }
363
364    #[test]
365    fn test_xxhash64_fixed_size_binary() {
366        let array = FixedSizeBinaryArray::from(vec![
367            Some(&[0x01, 0x02, 0x03, 0x04][..]),
368            Some(&[0x05, 0x06, 0x07, 0x08][..]),
369            None,
370            Some(&[0x00, 0x00, 0x00, 0x00][..]),
371        ]);
372        let array_ref: ArrayRef = Arc::new(array);
373
374        let mut hashes = vec![DEFAULT_SEED; 4];
375        create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
376
377        assert_eq!(
378            hashes[0],
379            spark_compatible_xxhash64([0x01, 0x02, 0x03, 0x04], 42)
380        );
381        assert_eq!(
382            hashes[1],
383            spark_compatible_xxhash64([0x05, 0x06, 0x07, 0x08], 42)
384        );
385        assert_eq!(hashes[2], DEFAULT_SEED);
386        assert_eq!(
387            hashes[3],
388            spark_compatible_xxhash64([0x00, 0x00, 0x00, 0x00], 42)
389        );
390    }
391
392    #[test]
393    fn test_xxhash64_struct() {
394        use arrow::array::StructArray;
395        use arrow::datatypes::Field;
396
397        let int_array = Int32Array::from(vec![1, 2, 3]);
398        let str_array = StringArray::from(vec!["a", "b", "c"]);
399        let struct_array = StructArray::from(vec![
400            (
401                Arc::new(Field::new("a", DataType::Int32, false)),
402                Arc::new(int_array) as ArrayRef,
403            ),
404            (
405                Arc::new(Field::new("b", DataType::Utf8, false)),
406                Arc::new(str_array) as ArrayRef,
407            ),
408        ]);
409        let array_ref: ArrayRef = Arc::new(struct_array);
410
411        let mut hashes = vec![DEFAULT_SEED; 3];
412        create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
413
414        for hash in &hashes {
415            assert_ne!(*hash, DEFAULT_SEED);
416        }
417        assert_ne!(hashes[0], hashes[1]);
418        assert_ne!(hashes[1], hashes[2]);
419    }
420
421    #[test]
422    fn test_xxhash64_list() {
423        use arrow::array::ListArray;
424        use arrow::buffer::OffsetBuffer;
425        use arrow::datatypes::Field;
426
427        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
428        let offsets = OffsetBuffer::new(vec![0i32, 2, 3, 6].into());
429        let list_array = ListArray::new(
430            Arc::new(Field::new_list_field(DataType::Int32, false)),
431            offsets,
432            Arc::new(values),
433            None,
434        );
435        let array_ref: ArrayRef = Arc::new(list_array);
436
437        let mut hashes = vec![DEFAULT_SEED; 3];
438        create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
439
440        for hash in &hashes {
441            assert_ne!(*hash, DEFAULT_SEED);
442        }
443        assert_ne!(hashes[0], hashes[1]);
444    }
445}