1use std::sync::Arc;
19
20use arrow::array::{
21 Array, ArrayRef, DictionaryArray, Int64Array, types::ArrowDictionaryKeyType,
22};
23use arrow::buffer::{Buffer, ScalarBuffer};
24use arrow::compute::take;
25use arrow::datatypes::{ArrowNativeType, DataType, Field, FieldRef};
26use datafusion_common::{Result, ScalarValue, internal_err};
27use datafusion_expr::{
28 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
29 Volatility,
30};
31use twox_hash::XxHash64;
32
33use crate::create_hashes_internal;
34
35const DEFAULT_SEED: u64 = 42;
36
37#[derive(Debug, PartialEq, Eq, Hash)]
40pub struct SparkXxhash64 {
41 signature: Signature,
42}
43
44impl Default for SparkXxhash64 {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl SparkXxhash64 {
51 pub fn new() -> Self {
52 Self {
53 signature: Signature::variadic_any(Volatility::Immutable),
54 }
55 }
56}
57
58impl ScalarUDFImpl for SparkXxhash64 {
59 fn name(&self) -> &str {
60 "xxhash64"
61 }
62
63 fn signature(&self) -> &Signature {
64 &self.signature
65 }
66
67 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
68 internal_err!("return_field_from_args should be used instead")
69 }
70
71 fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
72 Ok(Arc::new(Field::new(self.name(), DataType::Int64, false)))
75 }
76
77 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
78 let num_rows = args.number_rows;
79 let mut hashes: Vec<u64> = vec![DEFAULT_SEED; num_rows];
80
81 let arrays = ColumnarValue::values_to_arrays(&args.args)?;
82 create_xxhash64_hashes(&arrays, &mut hashes)?;
83
84 if num_rows == 1 {
85 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(
86 hashes[0] as i64,
87 ))))
88 } else {
89 let buffer = ScalarBuffer::<i64>::from(Buffer::from_vec(hashes));
92 Ok(ColumnarValue::Array(Arc::new(Int64Array::new(
93 buffer, None,
94 ))))
95 }
96 }
97}
98
99#[inline]
100fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 {
101 XxHash64::oneshot(seed, data.as_ref())
102}
103
104fn create_xxhash64_hashes_dictionary<K: ArrowDictionaryKeyType>(
106 array: &ArrayRef,
107 hashes_buffer: &mut [u64],
108 first_col: bool,
109) -> Result<()> {
110 let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
111 if !first_col {
112 let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?;
113 create_xxhash64_hashes(&[unpacked], hashes_buffer)?;
114 } else {
115 let dict_values = Arc::clone(dict_array.values());
118 let mut dict_hashes = vec![DEFAULT_SEED; dict_values.len()];
119 create_xxhash64_hashes(&[dict_values], &mut dict_hashes)?;
120
121 for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
122 if let Some(key) = key {
123 *hash = dict_hashes[key.as_usize()]
124 }
125 }
127 }
128 Ok(())
129}
130
131fn create_xxhash64_hashes(arrays: &[ArrayRef], hashes_buffer: &mut [u64]) -> Result<()> {
137 create_hashes_internal!(
138 arrays,
139 hashes_buffer,
140 spark_compatible_xxhash64,
141 create_xxhash64_hashes_dictionary,
142 create_xxhash64_hashes
143 );
144 Ok(())
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use arrow::array::{FixedSizeBinaryArray, Int32Array, StringArray};
151
152 #[test]
153 fn test_xxhash64_nullability() -> Result<()> {
154 let func = SparkXxhash64::new();
155
156 let nullable: FieldRef = Arc::new(Field::new("a", DataType::Int32, true));
159 let non_nullable: FieldRef = Arc::new(Field::new("b", DataType::Int32, false));
160
161 let out = func.return_field_from_args(ReturnFieldArgs {
162 arg_fields: &[Arc::clone(&nullable), Arc::clone(&non_nullable)],
163 scalar_arguments: &[None, None],
164 })?;
165 assert!(!out.is_nullable());
166 assert_eq!(out.data_type(), &DataType::Int64);
167
168 Ok(())
169 }
170
171 #[test]
172 fn test_xxhash64_i32() {
173 let seed = 42u64;
174 assert_eq!(
175 spark_compatible_xxhash64(1i32.to_le_bytes(), seed),
176 0xa309b38455455929
177 );
178 assert_eq!(
179 spark_compatible_xxhash64(0i32.to_le_bytes(), seed),
180 0x3229fbc4681e48f3
181 );
182 assert_eq!(
183 spark_compatible_xxhash64((-1i32).to_le_bytes(), seed),
184 0x1bfdda8861c06e45
185 );
186 }
187
188 #[test]
189 fn test_xxhash64_i32_boundary() {
190 let seed = 42u64;
191 let h = spark_compatible_xxhash64(i32::MAX.to_le_bytes(), seed);
192 assert_ne!(h, seed);
193 let h = spark_compatible_xxhash64(i32::MIN.to_le_bytes(), seed);
194 assert_ne!(h, seed);
195 }
196
197 #[test]
198 fn test_xxhash64_i8() {
199 let seed = 42u64;
200 assert_eq!(
202 spark_compatible_xxhash64((1i8 as i32).to_le_bytes(), seed),
203 spark_compatible_xxhash64(1i32.to_le_bytes(), seed),
204 );
205 }
206
207 #[test]
208 fn test_xxhash64_i64() {
209 let seed = 42u64;
210 assert_eq!(
211 spark_compatible_xxhash64(1i64.to_le_bytes(), seed),
212 0x9ed50fd59358d232
213 );
214 assert_eq!(
215 spark_compatible_xxhash64(0i64.to_le_bytes(), seed),
216 0xb71b47ebda15746c
217 );
218 assert_eq!(
219 spark_compatible_xxhash64((-1i64).to_le_bytes(), seed),
220 0x358ae035bfb46fd2
221 );
222 }
223
224 #[test]
225 fn test_xxhash64_i64_boundary() {
226 let seed = 42u64;
227 let h = spark_compatible_xxhash64(i64::MAX.to_le_bytes(), seed);
228 assert_ne!(h, seed);
229 let h = spark_compatible_xxhash64(i64::MIN.to_le_bytes(), seed);
230 assert_ne!(h, seed);
231 }
232
233 #[test]
237 fn test_xxhash64_negative_zero_f32() {
238 use arrow::array::Float32Array;
239 let array: ArrayRef = Arc::new(Float32Array::from(vec![0.0f32, -0.0f32]));
240 let mut hashes = vec![DEFAULT_SEED; 2];
241 create_xxhash64_hashes(&[array], &mut hashes).unwrap();
242 assert_eq!(hashes[0], hashes[1]);
243 assert_eq!(hashes[0], spark_compatible_xxhash64(0i32.to_le_bytes(), 42));
244 }
245
246 #[test]
247 fn test_xxhash64_negative_zero_f64() {
248 use arrow::array::Float64Array;
249 let array: ArrayRef = Arc::new(Float64Array::from(vec![0.0f64, -0.0f64]));
250 let mut hashes = vec![DEFAULT_SEED; 2];
251 create_xxhash64_hashes(&[array], &mut hashes).unwrap();
252 assert_eq!(hashes[0], hashes[1]);
253 assert_eq!(hashes[0], spark_compatible_xxhash64(0i64.to_le_bytes(), 42));
254 }
255
256 #[test]
257 fn test_xxhash64_string() {
258 let seed = 42u64;
259 assert_eq!(spark_compatible_xxhash64("hello", seed), 0xc3629e6318d53932);
260 assert_eq!(spark_compatible_xxhash64("", seed), 0x98b1582b0977e704);
261 assert_eq!(spark_compatible_xxhash64("abc", seed), 0x13c1d910702770e6);
262 }
263
264 #[test]
265 fn test_xxhash64_string_emoji_cjk() {
266 let seed = 42u64;
267 let h1 = spark_compatible_xxhash64("😁", seed);
268 assert_ne!(h1, seed);
269 let h2 = spark_compatible_xxhash64("天地", seed);
270 assert_ne!(h2, seed);
271 assert_ne!(h1, h2);
272 }
273
274 #[test]
275 fn test_xxhash64_dictionary_string() {
276 use arrow::array::DictionaryArray;
277 use arrow::datatypes::Int32Type;
278
279 let dict_array: DictionaryArray<Int32Type> =
280 vec!["hello", "world", "abc", "hello", "world"]
281 .into_iter()
282 .collect();
283 let array_ref: ArrayRef = Arc::new(dict_array);
284
285 let mut hashes = vec![DEFAULT_SEED; 5];
286 create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
287
288 assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 42));
289 assert_eq!(hashes[1], spark_compatible_xxhash64("world", 42));
290 assert_eq!(hashes[2], spark_compatible_xxhash64("abc", 42));
291 assert_eq!(hashes[3], hashes[0]);
292 assert_eq!(hashes[4], hashes[1]);
293 }
294
295 #[test]
296 fn test_xxhash64_dictionary_int() {
297 use arrow::array::DictionaryArray;
298 use arrow::datatypes::Int32Type;
299
300 let keys = Int32Array::from(vec![0, 1, 2, 0, 1]);
301 let values = Int32Array::from(vec![100, 200, 300]);
302 let dict_array =
303 DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
304 let array_ref: ArrayRef = Arc::new(dict_array);
305
306 let mut hashes = vec![DEFAULT_SEED; 5];
307 create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
308
309 assert_eq!(
310 hashes[0],
311 spark_compatible_xxhash64(100i32.to_le_bytes(), 42)
312 );
313 assert_eq!(
314 hashes[1],
315 spark_compatible_xxhash64(200i32.to_le_bytes(), 42)
316 );
317 assert_eq!(
318 hashes[2],
319 spark_compatible_xxhash64(300i32.to_le_bytes(), 42)
320 );
321 assert_eq!(hashes[3], hashes[0]);
322 assert_eq!(hashes[4], hashes[1]);
323 }
324
325 #[test]
326 fn test_xxhash64_dictionary_with_nulls() {
327 use arrow::array::DictionaryArray;
328 use arrow::datatypes::Int32Type;
329
330 let keys = Int32Array::from(vec![Some(0), None, Some(1), Some(0), None]);
331 let values = StringArray::from(vec!["hello", "world"]);
332 let dict_array =
333 DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
334 let array_ref: ArrayRef = Arc::new(dict_array);
335
336 let mut hashes = vec![DEFAULT_SEED; 5];
337 create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
338
339 assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 42));
340 assert_eq!(hashes[2], spark_compatible_xxhash64("world", 42));
341 assert_eq!(hashes[3], spark_compatible_xxhash64("hello", 42));
342 assert_eq!(hashes[1], DEFAULT_SEED);
343 assert_eq!(hashes[4], DEFAULT_SEED);
344 }
345
346 #[test]
347 fn test_xxhash64_dictionary_non_first_column() {
348 use arrow::array::DictionaryArray;
349 use arrow::datatypes::Int32Type;
350
351 let dict_array: DictionaryArray<Int32Type> =
352 vec!["hello", "world", "abc"].into_iter().collect();
353 let array_ref: ArrayRef = Arc::new(dict_array);
354
355 let mut hashes = vec![123u64, 456u64, 789u64];
356 create_xxhash64_hashes_dictionary::<Int32Type>(&array_ref, &mut hashes, false)
357 .unwrap();
358
359 assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 123));
360 assert_eq!(hashes[1], spark_compatible_xxhash64("world", 456));
361 assert_eq!(hashes[2], spark_compatible_xxhash64("abc", 789));
362 }
363
364 #[test]
365 fn test_xxhash64_fixed_size_binary() {
366 let array = FixedSizeBinaryArray::from(vec![
367 Some(&[0x01, 0x02, 0x03, 0x04][..]),
368 Some(&[0x05, 0x06, 0x07, 0x08][..]),
369 None,
370 Some(&[0x00, 0x00, 0x00, 0x00][..]),
371 ]);
372 let array_ref: ArrayRef = Arc::new(array);
373
374 let mut hashes = vec![DEFAULT_SEED; 4];
375 create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
376
377 assert_eq!(
378 hashes[0],
379 spark_compatible_xxhash64([0x01, 0x02, 0x03, 0x04], 42)
380 );
381 assert_eq!(
382 hashes[1],
383 spark_compatible_xxhash64([0x05, 0x06, 0x07, 0x08], 42)
384 );
385 assert_eq!(hashes[2], DEFAULT_SEED);
386 assert_eq!(
387 hashes[3],
388 spark_compatible_xxhash64([0x00, 0x00, 0x00, 0x00], 42)
389 );
390 }
391
392 #[test]
393 fn test_xxhash64_struct() {
394 use arrow::array::StructArray;
395 use arrow::datatypes::Field;
396
397 let int_array = Int32Array::from(vec![1, 2, 3]);
398 let str_array = StringArray::from(vec!["a", "b", "c"]);
399 let struct_array = StructArray::from(vec![
400 (
401 Arc::new(Field::new("a", DataType::Int32, false)),
402 Arc::new(int_array) as ArrayRef,
403 ),
404 (
405 Arc::new(Field::new("b", DataType::Utf8, false)),
406 Arc::new(str_array) as ArrayRef,
407 ),
408 ]);
409 let array_ref: ArrayRef = Arc::new(struct_array);
410
411 let mut hashes = vec![DEFAULT_SEED; 3];
412 create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
413
414 for hash in &hashes {
415 assert_ne!(*hash, DEFAULT_SEED);
416 }
417 assert_ne!(hashes[0], hashes[1]);
418 assert_ne!(hashes[1], hashes[2]);
419 }
420
421 #[test]
422 fn test_xxhash64_list() {
423 use arrow::array::ListArray;
424 use arrow::buffer::OffsetBuffer;
425 use arrow::datatypes::Field;
426
427 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
428 let offsets = OffsetBuffer::new(vec![0i32, 2, 3, 6].into());
429 let list_array = ListArray::new(
430 Arc::new(Field::new_list_field(DataType::Int32, false)),
431 offsets,
432 Arc::new(values),
433 None,
434 );
435 let array_ref: ArrayRef = Arc::new(list_array);
436
437 let mut hashes = vec![DEFAULT_SEED; 3];
438 create_xxhash64_hashes(&[array_ref], &mut hashes).unwrap();
439
440 for hash in &hashes {
441 assert_ne!(*hash, DEFAULT_SEED);
442 }
443 assert_ne!(hashes[0], hashes[1]);
444 }
445}