datafusion_common/
hash_utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Functionality used both on logical and physical plans
19
20use ahash::RandomState;
21use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
22use arrow::array::*;
23use arrow::datatypes::*;
24#[cfg(not(feature = "force_hash_collisions"))]
25use arrow::{downcast_dictionary_array, downcast_primitive_array};
26
27#[cfg(not(feature = "force_hash_collisions"))]
28use crate::cast::{
29    as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
30    as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
31    as_string_array, as_string_view_array, as_struct_array, as_union_array,
32};
33use crate::error::Result;
34use crate::error::{_internal_datafusion_err, _internal_err};
35use std::cell::RefCell;
36
37// Combines two hashes into one hash
38#[inline]
39pub fn combine_hashes(l: u64, r: u64) -> u64 {
40    let hash = (17 * 37u64).wrapping_add(l);
41    hash.wrapping_mul(37).wrapping_add(r)
42}
43
44/// Maximum size for the thread-local hash buffer before truncation (4MB = 524,288 u64 elements).
45/// The goal of this is to avoid unbounded memory growth that would appear as a memory leak.
46/// We allow temporary allocations beyond this size, but after use the buffer is truncated
47/// to this size.
48const MAX_BUFFER_SIZE: usize = 524_288;
49
50thread_local! {
51    /// Thread-local buffer for hash computations to avoid repeated allocations.
52    /// The buffer is reused across calls and truncated if it exceeds MAX_BUFFER_SIZE.
53    /// Defaults to a capacity of 8192 u64 elements which is the default batch size.
54    /// This corresponds to 64KB of memory.
55    static HASH_BUFFER: RefCell<Vec<u64>> = const { RefCell::new(Vec::new()) };
56}
57
58/// Creates hashes for the given arrays using a thread-local buffer, then calls the provided callback
59/// with an immutable reference to the computed hashes.
60///
61/// This function manages a thread-local buffer to avoid repeated allocations. The buffer is automatically
62/// truncated if it exceeds `MAX_BUFFER_SIZE` after use.
63///
64/// # Arguments
65/// * `arrays` - The arrays to hash (must contain at least one array)
66/// * `random_state` - The random state for hashing
67/// * `callback` - A function that receives an immutable reference to the hash slice and returns a result
68///
69/// # Errors
70/// Returns an error if:
71/// - No arrays are provided
72/// - The function is called reentrantly (i.e., the callback invokes `with_hashes` again on the same thread)
73/// - The function is called during or after thread destruction
74///
75/// # Example
76/// ```ignore
77/// use datafusion_common::hash_utils::{with_hashes, RandomState};
78/// use arrow::array::{Int32Array, ArrayRef};
79/// use std::sync::Arc;
80///
81/// let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
82/// let random_state = RandomState::new();
83///
84/// let result = with_hashes([&array], &random_state, |hashes| {
85///     // Use the hashes here
86///     Ok(hashes.len())
87/// })?;
88/// ```
89pub fn with_hashes<I, T, F, R>(
90    arrays: I,
91    random_state: &RandomState,
92    callback: F,
93) -> Result<R>
94where
95    I: IntoIterator<Item = T>,
96    T: AsDynArray,
97    F: FnOnce(&[u64]) -> Result<R>,
98{
99    // Peek at the first array to determine buffer size without fully collecting
100    let mut iter = arrays.into_iter().peekable();
101
102    // Get the required size from the first array
103    let required_size = match iter.peek() {
104        Some(arr) => arr.as_dyn_array().len(),
105        None => return _internal_err!("with_hashes requires at least one array"),
106    };
107
108    HASH_BUFFER.try_with(|cell| {
109        let mut buffer = cell.try_borrow_mut()
110            .map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?;
111
112        // Ensure buffer has sufficient length, clearing old values
113        buffer.clear();
114        buffer.resize(required_size, 0);
115
116        // Create hashes in the buffer - this consumes the iterator
117        create_hashes(iter, random_state, &mut buffer[..required_size])?;
118
119        // Execute the callback with an immutable slice
120        let result = callback(&buffer[..required_size])?;
121
122        // Cleanup: truncate if buffer grew too large
123        if buffer.capacity() > MAX_BUFFER_SIZE {
124            buffer.truncate(MAX_BUFFER_SIZE);
125            buffer.shrink_to_fit();
126        }
127
128        Ok(result)
129    }).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))?
130}
131
132#[cfg(not(feature = "force_hash_collisions"))]
133fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
134    if mul_col {
135        hashes_buffer.iter_mut().for_each(|hash| {
136            // stable hash for null value
137            *hash = combine_hashes(random_state.hash_one(1), *hash);
138        })
139    } else {
140        hashes_buffer.iter_mut().for_each(|hash| {
141            *hash = random_state.hash_one(1);
142        })
143    }
144}
145
146pub trait HashValue {
147    fn hash_one(&self, state: &RandomState) -> u64;
148}
149
150impl<T: HashValue + ?Sized> HashValue for &T {
151    fn hash_one(&self, state: &RandomState) -> u64 {
152        T::hash_one(self, state)
153    }
154}
155
156macro_rules! hash_value {
157    ($($t:ty),+) => {
158        $(impl HashValue for $t {
159            fn hash_one(&self, state: &RandomState) -> u64 {
160                state.hash_one(self)
161            }
162        })+
163    };
164}
165hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64, u128);
166hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano);
167
168macro_rules! hash_float_value {
169    ($(($t:ty, $i:ty)),+) => {
170        $(impl HashValue for $t {
171            fn hash_one(&self, state: &RandomState) -> u64 {
172                state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes()))
173            }
174        })+
175    };
176}
177hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
178
179/// Builds hash values of PrimitiveArray and writes them into `hashes_buffer`
180/// If `rehash==true` this combines the previous hash value in the buffer
181/// with the new hash using `combine_hashes`
182#[cfg(not(feature = "force_hash_collisions"))]
183fn hash_array_primitive<T>(
184    array: &PrimitiveArray<T>,
185    random_state: &RandomState,
186    hashes_buffer: &mut [u64],
187    rehash: bool,
188) where
189    T: ArrowPrimitiveType<Native: HashValue>,
190{
191    assert_eq!(
192        hashes_buffer.len(),
193        array.len(),
194        "hashes_buffer and array should be of equal length"
195    );
196
197    if array.null_count() == 0 {
198        if rehash {
199            for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) {
200                *hash = combine_hashes(value.hash_one(random_state), *hash);
201            }
202        } else {
203            for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) {
204                *hash = value.hash_one(random_state);
205            }
206        }
207    } else if rehash {
208        for (i, hash) in hashes_buffer.iter_mut().enumerate() {
209            if !array.is_null(i) {
210                let value = unsafe { array.value_unchecked(i) };
211                *hash = combine_hashes(value.hash_one(random_state), *hash);
212            }
213        }
214    } else {
215        for (i, hash) in hashes_buffer.iter_mut().enumerate() {
216            if !array.is_null(i) {
217                let value = unsafe { array.value_unchecked(i) };
218                *hash = value.hash_one(random_state);
219            }
220        }
221    }
222}
223
224/// Hashes one array into the `hashes_buffer`
225/// If `rehash==true` this combines the previous hash value in the buffer
226/// with the new hash using `combine_hashes`
227#[cfg(not(feature = "force_hash_collisions"))]
228fn hash_array<T>(
229    array: &T,
230    random_state: &RandomState,
231    hashes_buffer: &mut [u64],
232    rehash: bool,
233) where
234    T: ArrayAccessor,
235    T::Item: HashValue,
236{
237    assert_eq!(
238        hashes_buffer.len(),
239        array.len(),
240        "hashes_buffer and array should be of equal length"
241    );
242
243    if array.null_count() == 0 {
244        if rehash {
245            for (i, hash) in hashes_buffer.iter_mut().enumerate() {
246                let value = unsafe { array.value_unchecked(i) };
247                *hash = combine_hashes(value.hash_one(random_state), *hash);
248            }
249        } else {
250            for (i, hash) in hashes_buffer.iter_mut().enumerate() {
251                let value = unsafe { array.value_unchecked(i) };
252                *hash = value.hash_one(random_state);
253            }
254        }
255    } else if rehash {
256        for (i, hash) in hashes_buffer.iter_mut().enumerate() {
257            if !array.is_null(i) {
258                let value = unsafe { array.value_unchecked(i) };
259                *hash = combine_hashes(value.hash_one(random_state), *hash);
260            }
261        }
262    } else {
263        for (i, hash) in hashes_buffer.iter_mut().enumerate() {
264            if !array.is_null(i) {
265                let value = unsafe { array.value_unchecked(i) };
266                *hash = value.hash_one(random_state);
267            }
268        }
269    }
270}
271
272/// Hash a StringView or BytesView array
273///
274/// Templated to optimize inner loop based on presence of nulls and external buffers.
275///
276/// HAS_NULLS: do we have to check null in the inner loop
277/// HAS_BUFFERS: if true, array has external buffers; if false, all strings are inlined/ less then 12 bytes
278/// REHASH: if true, combining with existing hash, otherwise initializing
279#[inline(never)]
280fn hash_string_view_array_inner<
281    T: ByteViewType,
282    const HAS_NULLS: bool,
283    const HAS_BUFFERS: bool,
284    const REHASH: bool,
285>(
286    array: &GenericByteViewArray<T>,
287    random_state: &RandomState,
288    hashes_buffer: &mut [u64],
289) {
290    assert_eq!(
291        hashes_buffer.len(),
292        array.len(),
293        "hashes_buffer and array should be of equal length"
294    );
295
296    let buffers = array.data_buffers();
297    let view_bytes = |view_len: u32, view: u128| {
298        let view = ByteView::from(view);
299        let offset = view.offset as usize;
300        // SAFETY: view is a valid view as it came from the array
301        unsafe {
302            let data = buffers.get_unchecked(view.buffer_index as usize);
303            data.get_unchecked(offset..offset + view_len as usize)
304        }
305    };
306
307    let hashes_and_views = hashes_buffer.iter_mut().zip(array.views().iter());
308    for (i, (hash, &v)) in hashes_and_views.enumerate() {
309        if HAS_NULLS && array.is_null(i) {
310            continue;
311        }
312        let view_len = v as u32;
313        // all views are inlined, no need to access external buffers
314        if !HAS_BUFFERS || view_len <= 12 {
315            if REHASH {
316                *hash = combine_hashes(v.hash_one(random_state), *hash);
317            } else {
318                *hash = v.hash_one(random_state);
319            }
320            continue;
321        }
322        // view is not inlined, so we need to hash the bytes as well
323        let value = view_bytes(view_len, v);
324        if REHASH {
325            *hash = combine_hashes(value.hash_one(random_state), *hash);
326        } else {
327            *hash = value.hash_one(random_state);
328        }
329    }
330}
331
332/// Builds hash values for array views and writes them into `hashes_buffer`
333/// If `rehash==true` this combines the previous hash value in the buffer
334/// with the new hash using `combine_hashes`
335#[cfg(not(feature = "force_hash_collisions"))]
336fn hash_generic_byte_view_array<T: ByteViewType>(
337    array: &GenericByteViewArray<T>,
338    random_state: &RandomState,
339    hashes_buffer: &mut [u64],
340    rehash: bool,
341) {
342    // instantiate the correct version based on presence of nulls and external buffers
343    match (
344        array.null_count() != 0,
345        !array.data_buffers().is_empty(),
346        rehash,
347    ) {
348        // no nulls or buffers ==> hash the inlined views directly
349        // don't call the inner function as Rust seems better able to inline this simpler code (2-3% faster)
350        (false, false, false) => {
351            for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) {
352                *hash = view.hash_one(random_state);
353            }
354        }
355        (false, false, true) => {
356            for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) {
357                *hash = combine_hashes(view.hash_one(random_state), *hash);
358            }
359        }
360        (false, true, false) => hash_string_view_array_inner::<T, false, true, false>(
361            array,
362            random_state,
363            hashes_buffer,
364        ),
365        (false, true, true) => hash_string_view_array_inner::<T, false, true, true>(
366            array,
367            random_state,
368            hashes_buffer,
369        ),
370        (true, false, false) => hash_string_view_array_inner::<T, true, false, false>(
371            array,
372            random_state,
373            hashes_buffer,
374        ),
375        (true, false, true) => hash_string_view_array_inner::<T, true, false, true>(
376            array,
377            random_state,
378            hashes_buffer,
379        ),
380        (true, true, false) => hash_string_view_array_inner::<T, true, true, false>(
381            array,
382            random_state,
383            hashes_buffer,
384        ),
385        (true, true, true) => hash_string_view_array_inner::<T, true, true, true>(
386            array,
387            random_state,
388            hashes_buffer,
389        ),
390    }
391}
392
393/// Helper function to update hash for a dictionary key if the value is valid
394#[cfg(not(feature = "force_hash_collisions"))]
395#[inline]
396fn update_hash_for_dict_key(
397    hash: &mut u64,
398    dict_hashes: &[u64],
399    dict_values: &dyn Array,
400    idx: usize,
401    multi_col: bool,
402) {
403    if dict_values.is_valid(idx) {
404        if multi_col {
405            *hash = combine_hashes(dict_hashes[idx], *hash);
406        } else {
407            *hash = dict_hashes[idx];
408        }
409    }
410    // no update for invalid dictionary value
411}
412
413/// Hash the values in a dictionary array
414#[cfg(not(feature = "force_hash_collisions"))]
415fn hash_dictionary<K: ArrowDictionaryKeyType>(
416    array: &DictionaryArray<K>,
417    random_state: &RandomState,
418    hashes_buffer: &mut [u64],
419    multi_col: bool,
420) -> Result<()> {
421    // Hash each dictionary value once, and then use that computed
422    // hash for each key value to avoid a potentially expensive
423    // redundant hashing for large dictionary elements (e.g. strings)
424    let dict_values = array.values();
425    let mut dict_hashes = vec![0; dict_values.len()];
426    create_hashes([dict_values], random_state, &mut dict_hashes)?;
427
428    // combine hash for each index in values
429    for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
430        if let Some(key) = key {
431            let idx = key.as_usize();
432            update_hash_for_dict_key(
433                hash,
434                &dict_hashes,
435                dict_values.as_ref(),
436                idx,
437                multi_col,
438            );
439        } // no update for Null key
440    }
441    Ok(())
442}
443
444#[cfg(not(feature = "force_hash_collisions"))]
445fn hash_struct_array(
446    array: &StructArray,
447    random_state: &RandomState,
448    hashes_buffer: &mut [u64],
449) -> Result<()> {
450    let nulls = array.nulls();
451    let row_len = array.len();
452
453    let valid_row_indices: Vec<usize> = if let Some(nulls) = nulls {
454        nulls.valid_indices().collect()
455    } else {
456        (0..row_len).collect()
457    };
458
459    // Create hashes for each row that combines the hashes over all the column at that row.
460    let mut values_hashes = vec![0u64; row_len];
461    create_hashes(array.columns(), random_state, &mut values_hashes)?;
462
463    for i in valid_row_indices {
464        let hash = &mut hashes_buffer[i];
465        *hash = combine_hashes(*hash, values_hashes[i]);
466    }
467
468    Ok(())
469}
470
471// only adding this `cfg` b/c this function is only used with this `cfg`
472#[cfg(not(feature = "force_hash_collisions"))]
473fn hash_map_array(
474    array: &MapArray,
475    random_state: &RandomState,
476    hashes_buffer: &mut [u64],
477) -> Result<()> {
478    let nulls = array.nulls();
479    let offsets = array.offsets();
480
481    // Create hashes for each entry in each row
482    let mut values_hashes = vec![0u64; array.entries().len()];
483    create_hashes(array.entries().columns(), random_state, &mut values_hashes)?;
484
485    // Combine the hashes for entries on each row with each other and previous hash for that row
486    if let Some(nulls) = nulls {
487        for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
488            if nulls.is_valid(i) {
489                let hash = &mut hashes_buffer[i];
490                for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
491                    *hash = combine_hashes(*hash, *values_hash);
492                }
493            }
494        }
495    } else {
496        for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
497            let hash = &mut hashes_buffer[i];
498            for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
499                *hash = combine_hashes(*hash, *values_hash);
500            }
501        }
502    }
503
504    Ok(())
505}
506
507#[cfg(not(feature = "force_hash_collisions"))]
508fn hash_list_array<OffsetSize>(
509    array: &GenericListArray<OffsetSize>,
510    random_state: &RandomState,
511    hashes_buffer: &mut [u64],
512) -> Result<()>
513where
514    OffsetSize: OffsetSizeTrait,
515{
516    let values = array.values();
517    let offsets = array.value_offsets();
518    let nulls = array.nulls();
519    let mut values_hashes = vec![0u64; values.len()];
520    create_hashes([values], random_state, &mut values_hashes)?;
521    if let Some(nulls) = nulls {
522        for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
523            if nulls.is_valid(i) {
524                let hash = &mut hashes_buffer[i];
525                for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
526                    *hash = combine_hashes(*hash, *values_hash);
527                }
528            }
529        }
530    } else {
531        for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
532            let hash = &mut hashes_buffer[i];
533            for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
534                *hash = combine_hashes(*hash, *values_hash);
535            }
536        }
537    }
538    Ok(())
539}
540
541#[cfg(not(feature = "force_hash_collisions"))]
542fn hash_union_array(
543    array: &UnionArray,
544    random_state: &RandomState,
545    hashes_buffer: &mut [u64],
546) -> Result<()> {
547    use std::collections::HashMap;
548
549    let DataType::Union(union_fields, _mode) = array.data_type() else {
550        unreachable!()
551    };
552
553    let mut child_hashes = HashMap::with_capacity(union_fields.len());
554
555    for (type_id, _field) in union_fields.iter() {
556        let child = array.child(type_id);
557        let mut child_hash_buffer = vec![0; child.len()];
558        create_hashes([child], random_state, &mut child_hash_buffer)?;
559
560        child_hashes.insert(type_id, child_hash_buffer);
561    }
562
563    #[expect(clippy::needless_range_loop)]
564    for i in 0..array.len() {
565        let type_id = array.type_id(i);
566        let child_offset = array.value_offset(i);
567
568        let child_hash = child_hashes.get(&type_id).expect("invalid type_id");
569        hashes_buffer[i] = combine_hashes(hashes_buffer[i], child_hash[child_offset]);
570    }
571
572    Ok(())
573}
574
575#[cfg(not(feature = "force_hash_collisions"))]
576fn hash_fixed_list_array(
577    array: &FixedSizeListArray,
578    random_state: &RandomState,
579    hashes_buffer: &mut [u64],
580) -> Result<()> {
581    let values = array.values();
582    let value_length = array.value_length() as usize;
583    let nulls = array.nulls();
584    let mut values_hashes = vec![0u64; values.len()];
585    create_hashes([values], random_state, &mut values_hashes)?;
586    if let Some(nulls) = nulls {
587        for i in 0..array.len() {
588            if nulls.is_valid(i) {
589                let hash = &mut hashes_buffer[i];
590                for values_hash in
591                    &values_hashes[i * value_length..(i + 1) * value_length]
592                {
593                    *hash = combine_hashes(*hash, *values_hash);
594                }
595            }
596        }
597    } else {
598        for i in 0..array.len() {
599            let hash = &mut hashes_buffer[i];
600            for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] {
601                *hash = combine_hashes(*hash, *values_hash);
602            }
603        }
604    }
605    Ok(())
606}
607
608#[cfg(not(feature = "force_hash_collisions"))]
609fn hash_run_array<R: RunEndIndexType>(
610    array: &RunArray<R>,
611    random_state: &RandomState,
612    hashes_buffer: &mut [u64],
613    rehash: bool,
614) -> Result<()> {
615    // We find the relevant runs that cover potentially sliced arrays, so we can only hash those
616    // values. Then we find the runs that refer to the original runs and ensure that we apply
617    // hashes correctly to the sliced, whether sliced at the start, end, or both.
618    let array_offset = array.offset();
619    let array_len = array.len();
620
621    if array_len == 0 {
622        return Ok(());
623    }
624
625    let run_ends = array.run_ends();
626    let run_ends_values = run_ends.values();
627    let values = array.values();
628
629    let start_physical_index = array.get_start_physical_index();
630    // get_end_physical_index returns the inclusive last index, but we need the exclusive range end
631    // for the operations we use below.
632    let end_physical_index = array.get_end_physical_index() + 1;
633
634    let sliced_values = values.slice(
635        start_physical_index,
636        end_physical_index - start_physical_index,
637    );
638    let mut values_hashes = vec![0u64; sliced_values.len()];
639    create_hashes(
640        std::slice::from_ref(&sliced_values),
641        random_state,
642        &mut values_hashes,
643    )?;
644
645    let mut start_in_slice = 0;
646    for (adjusted_physical_index, &absolute_run_end) in run_ends_values
647        [start_physical_index..end_physical_index]
648        .iter()
649        .enumerate()
650    {
651        let is_null_value = sliced_values.is_null(adjusted_physical_index);
652        let absolute_run_end = absolute_run_end.as_usize();
653
654        let end_in_slice = (absolute_run_end - array_offset).min(array_len);
655
656        if rehash {
657            if !is_null_value {
658                let value_hash = values_hashes[adjusted_physical_index];
659                for hash in hashes_buffer
660                    .iter_mut()
661                    .take(end_in_slice)
662                    .skip(start_in_slice)
663                {
664                    *hash = combine_hashes(value_hash, *hash);
665                }
666            }
667        } else {
668            let value_hash = values_hashes[adjusted_physical_index];
669            hashes_buffer[start_in_slice..end_in_slice].fill(value_hash);
670        }
671
672        start_in_slice = end_in_slice;
673    }
674
675    Ok(())
676}
677
678/// Internal helper function that hashes a single array and either initializes or combines
679/// the hash values in the buffer.
680#[cfg(not(feature = "force_hash_collisions"))]
681fn hash_single_array(
682    array: &dyn Array,
683    random_state: &RandomState,
684    hashes_buffer: &mut [u64],
685    rehash: bool,
686) -> Result<()> {
687    downcast_primitive_array! {
688        array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
689        DataType::Null => hash_null(random_state, hashes_buffer, rehash),
690        DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash),
691        DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash),
692        DataType::Utf8View => hash_generic_byte_view_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
693        DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash),
694        DataType::Binary => hash_array(&as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
695        DataType::BinaryView => hash_generic_byte_view_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
696        DataType::LargeBinary => hash_array(&as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
697        DataType::FixedSizeBinary(_) => {
698            let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
699            hash_array(&array, random_state, hashes_buffer, rehash)
700        }
701        DataType::Dictionary(_, _) => downcast_dictionary_array! {
702            array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
703            _ => unreachable!()
704        }
705        DataType::Struct(_) => {
706            let array = as_struct_array(array)?;
707            hash_struct_array(array, random_state, hashes_buffer)?;
708        }
709        DataType::List(_) => {
710            let array = as_list_array(array)?;
711            hash_list_array(array, random_state, hashes_buffer)?;
712        }
713        DataType::LargeList(_) => {
714            let array = as_large_list_array(array)?;
715            hash_list_array(array, random_state, hashes_buffer)?;
716        }
717        DataType::Map(_, _) => {
718            let array = as_map_array(array)?;
719            hash_map_array(array, random_state, hashes_buffer)?;
720        }
721        DataType::FixedSizeList(_,_) => {
722            let array = as_fixed_size_list_array(array)?;
723            hash_fixed_list_array(array, random_state, hashes_buffer)?;
724        }
725        DataType::Union(_, _) => {
726            let array = as_union_array(array)?;
727            hash_union_array(array, random_state, hashes_buffer)?;
728        }
729        DataType::RunEndEncoded(_, _) => downcast_run_array! {
730            array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
731            _ => unreachable!()
732        }
733        _ => {
734            // This is internal because we should have caught this before.
735            return _internal_err!(
736                "Unsupported data type in hasher: {}",
737                array.data_type()
738            );
739        }
740    }
741    Ok(())
742}
743
744/// Test version of `hash_single_array` that forces all hashes to collide to zero.
745#[cfg(feature = "force_hash_collisions")]
746fn hash_single_array(
747    _array: &dyn Array,
748    _random_state: &RandomState,
749    hashes_buffer: &mut [u64],
750    _rehash: bool,
751) -> Result<()> {
752    for hash in hashes_buffer.iter_mut() {
753        *hash = 0
754    }
755    Ok(())
756}
757
758/// Something that can be returned as a `&dyn Array`.
759///
760/// We want `create_hashes` to accept either `&dyn Array` or `ArrayRef`,
761/// and this seems the best way to do so.
762///
763/// We tried having it accept `AsRef<dyn Array>`
764/// but that is not implemented for and cannot be implemented for
765/// `&dyn Array` so callers that have the latter would not be able
766/// to call `create_hashes` directly. This shim trait makes it possible.
767pub trait AsDynArray {
768    fn as_dyn_array(&self) -> &dyn Array;
769}
770
771impl AsDynArray for dyn Array {
772    fn as_dyn_array(&self) -> &dyn Array {
773        self
774    }
775}
776
777impl AsDynArray for &dyn Array {
778    fn as_dyn_array(&self) -> &dyn Array {
779        *self
780    }
781}
782
783impl AsDynArray for ArrayRef {
784    fn as_dyn_array(&self) -> &dyn Array {
785        self.as_ref()
786    }
787}
788
789impl AsDynArray for &ArrayRef {
790    fn as_dyn_array(&self) -> &dyn Array {
791        self.as_ref()
792    }
793}
794
795/// Creates hash values for every row, based on the values in the columns.
796///
797/// The number of rows to hash is determined by `hashes_buffer.len()`.
798/// `hashes_buffer` should be pre-sized appropriately.
799pub fn create_hashes<'a, I, T>(
800    arrays: I,
801    random_state: &RandomState,
802    hashes_buffer: &'a mut [u64],
803) -> Result<&'a mut [u64]>
804where
805    I: IntoIterator<Item = T>,
806    T: AsDynArray,
807{
808    for (i, array) in arrays.into_iter().enumerate() {
809        // combine hashes with `combine_hashes` for all columns besides the first
810        let rehash = i >= 1;
811        hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
812    }
813    Ok(hashes_buffer)
814}
815
816#[cfg(test)]
817mod tests {
818    use std::sync::Arc;
819
820    use arrow::array::*;
821    #[cfg(not(feature = "force_hash_collisions"))]
822    use arrow::datatypes::*;
823
824    use super::*;
825
826    #[test]
827    fn create_hashes_for_decimal_array() -> Result<()> {
828        let array = vec![1, 2, 3, 4]
829            .into_iter()
830            .map(Some)
831            .collect::<Decimal128Array>()
832            .with_precision_and_scale(20, 3)
833            .unwrap();
834        let array_ref: ArrayRef = Arc::new(array);
835        let random_state = RandomState::with_seeds(0, 0, 0, 0);
836        let hashes_buff = &mut vec![0; array_ref.len()];
837        let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
838        assert_eq!(hashes.len(), 4);
839        Ok(())
840    }
841
842    #[test]
843    fn create_hashes_for_empty_fixed_size_lit() -> Result<()> {
844        let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
845        let random_state = RandomState::with_seeds(0, 0, 0, 0);
846        let hashes_buff = &mut [0; 0];
847        let hashes = create_hashes(
848            &[Arc::new(empty_array) as ArrayRef],
849            &random_state,
850            hashes_buff,
851        )?;
852        assert_eq!(hashes, &Vec::<u64>::new());
853        Ok(())
854    }
855
856    #[test]
857    fn create_hashes_for_float_arrays() -> Result<()> {
858        let f32_arr: ArrayRef =
859            Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
860        let f64_arr: ArrayRef =
861            Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
862
863        let random_state = RandomState::with_seeds(0, 0, 0, 0);
864        let hashes_buff = &mut vec![0; f32_arr.len()];
865        let hashes = create_hashes(&[f32_arr], &random_state, hashes_buff)?;
866        assert_eq!(hashes.len(), 4,);
867
868        let hashes = create_hashes(&[f64_arr], &random_state, hashes_buff)?;
869        assert_eq!(hashes.len(), 4,);
870
871        Ok(())
872    }
873
874    macro_rules! create_hash_binary {
875        ($NAME:ident, $ARRAY:ty) => {
876            #[cfg(not(feature = "force_hash_collisions"))]
877            #[test]
878            fn $NAME() {
879                let binary = [
880                    Some(b"short".to_byte_slice()),
881                    None,
882                    Some(b"long but different 12 bytes string"),
883                    Some(b"short2"),
884                    Some(b"Longer than 12 bytes string"),
885                    Some(b"short"),
886                    Some(b"Longer than 12 bytes string"),
887                ];
888
889                let binary_array: ArrayRef =
890                    Arc::new(binary.iter().cloned().collect::<$ARRAY>());
891
892                let random_state = RandomState::with_seeds(0, 0, 0, 0);
893
894                let mut binary_hashes = vec![0; binary.len()];
895                create_hashes(&[binary_array], &random_state, &mut binary_hashes)
896                    .unwrap();
897
898                // Null values result in a zero hash,
899                for (val, hash) in binary.iter().zip(binary_hashes.iter()) {
900                    match val {
901                        Some(_) => assert_ne!(*hash, 0),
902                        None => assert_eq!(*hash, 0),
903                    }
904                }
905
906                // Same values should map to same hash values
907                assert_eq!(binary[0], binary[5]);
908                assert_eq!(binary[4], binary[6]);
909
910                // different binary should map to different hash values
911                assert_ne!(binary[0], binary[2]);
912            }
913        };
914    }
915
916    create_hash_binary!(binary_array, BinaryArray);
917    create_hash_binary!(large_binary_array, LargeBinaryArray);
918    create_hash_binary!(binary_view_array, BinaryViewArray);
919
920    #[test]
921    fn create_hashes_fixed_size_binary() -> Result<()> {
922        let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
923        let fixed_size_binary_array: ArrayRef =
924            Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());
925
926        let random_state = RandomState::with_seeds(0, 0, 0, 0);
927        let hashes_buff = &mut vec![0; fixed_size_binary_array.len()];
928        let hashes =
929            create_hashes(&[fixed_size_binary_array], &random_state, hashes_buff)?;
930        assert_eq!(hashes.len(), 3,);
931
932        Ok(())
933    }
934
935    macro_rules! create_hash_string {
936        ($NAME:ident, $ARRAY:ty) => {
937            #[cfg(not(feature = "force_hash_collisions"))]
938            #[test]
939            fn $NAME() {
940                let strings = [
941                    Some("short"),
942                    None,
943                    Some("long but different 12 bytes string"),
944                    Some("short2"),
945                    Some("Longer than 12 bytes string"),
946                    Some("short"),
947                    Some("Longer than 12 bytes string"),
948                ];
949
950                let string_array: ArrayRef =
951                    Arc::new(strings.iter().cloned().collect::<$ARRAY>());
952                let dict_array: ArrayRef = Arc::new(
953                    strings
954                        .iter()
955                        .cloned()
956                        .collect::<DictionaryArray<Int8Type>>(),
957                );
958
959                let random_state = RandomState::with_seeds(0, 0, 0, 0);
960
961                let mut string_hashes = vec![0; strings.len()];
962                create_hashes(&[string_array], &random_state, &mut string_hashes)
963                    .unwrap();
964
965                let mut dict_hashes = vec![0; strings.len()];
966                create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
967
968                // Null values result in a zero hash,
969                for (val, hash) in strings.iter().zip(string_hashes.iter()) {
970                    match val {
971                        Some(_) => assert_ne!(*hash, 0),
972                        None => assert_eq!(*hash, 0),
973                    }
974                }
975
976                // same logical values should hash to the same hash value
977                assert_eq!(string_hashes, dict_hashes);
978
979                // Same values should map to same hash values
980                assert_eq!(strings[0], strings[5]);
981                assert_eq!(strings[4], strings[6]);
982
983                // different strings should map to different hash values
984                assert_ne!(strings[0], strings[2]);
985            }
986        };
987    }
988
989    create_hash_string!(string_array, StringArray);
990    create_hash_string!(large_string_array, LargeStringArray);
991    create_hash_string!(string_view_array, StringArray);
992    create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
993
994    #[test]
995    #[cfg(not(feature = "force_hash_collisions"))]
996    fn create_hashes_for_run_array() -> Result<()> {
997        let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
998        let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
999        let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1000
1001        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1002        let hashes_buff = &mut vec![0; array.len()];
1003        let hashes = create_hashes(
1004            &[Arc::clone(&array) as ArrayRef],
1005            &random_state,
1006            hashes_buff,
1007        )?;
1008
1009        assert_eq!(hashes.len(), 7);
1010        assert_eq!(hashes[0], hashes[1]);
1011        assert_eq!(hashes[2], hashes[3]);
1012        assert_eq!(hashes[3], hashes[4]);
1013        assert_eq!(hashes[5], hashes[6]);
1014        assert_ne!(hashes[0], hashes[2]);
1015        assert_ne!(hashes[2], hashes[5]);
1016        assert_ne!(hashes[0], hashes[5]);
1017
1018        Ok(())
1019    }
1020
1021    #[test]
1022    #[cfg(not(feature = "force_hash_collisions"))]
1023    fn create_multi_column_hash_with_run_array() -> Result<()> {
1024        let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
1025        let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
1026        let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1027        let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1028
1029        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1030        let mut one_col_hashes = vec![0; int_array.len()];
1031        create_hashes(
1032            &[Arc::clone(&int_array) as ArrayRef],
1033            &random_state,
1034            &mut one_col_hashes,
1035        )?;
1036
1037        let mut two_col_hashes = vec![0; int_array.len()];
1038        create_hashes(
1039            &[
1040                Arc::clone(&int_array) as ArrayRef,
1041                Arc::clone(&run_array) as ArrayRef,
1042            ],
1043            &random_state,
1044            &mut two_col_hashes,
1045        )?;
1046
1047        assert_eq!(one_col_hashes.len(), 7);
1048        assert_eq!(two_col_hashes.len(), 7);
1049        assert_ne!(one_col_hashes, two_col_hashes);
1050
1051        let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
1052        let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
1053        assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
1054
1055        let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
1056        let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
1057        assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
1058
1059        Ok(())
1060    }
1061
1062    #[test]
1063    // Tests actual values of hashes, which are different if forcing collisions
1064    #[cfg(not(feature = "force_hash_collisions"))]
1065    fn create_hashes_for_dict_arrays() {
1066        let strings = [Some("foo"), None, Some("bar"), Some("foo"), None];
1067
1068        let string_array: ArrayRef =
1069            Arc::new(strings.iter().cloned().collect::<StringArray>());
1070        let dict_array: ArrayRef = Arc::new(
1071            strings
1072                .iter()
1073                .cloned()
1074                .collect::<DictionaryArray<Int8Type>>(),
1075        );
1076
1077        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1078
1079        let mut string_hashes = vec![0; strings.len()];
1080        create_hashes(&[string_array], &random_state, &mut string_hashes).unwrap();
1081
1082        let mut dict_hashes = vec![0; strings.len()];
1083        create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
1084
1085        // Null values result in a zero hash,
1086        for (val, hash) in strings.iter().zip(string_hashes.iter()) {
1087            match val {
1088                Some(_) => assert_ne!(*hash, 0),
1089                None => assert_eq!(*hash, 0),
1090            }
1091        }
1092
1093        // same logical values should hash to the same hash value
1094        assert_eq!(string_hashes, dict_hashes);
1095
1096        // Same values should map to same hash values
1097        assert_eq!(strings[1], strings[4]);
1098        assert_eq!(dict_hashes[1], dict_hashes[4]);
1099        assert_eq!(strings[0], strings[3]);
1100        assert_eq!(dict_hashes[0], dict_hashes[3]);
1101
1102        // different strings should map to different hash values
1103        assert_ne!(strings[0], strings[2]);
1104        assert_ne!(dict_hashes[0], dict_hashes[2]);
1105    }
1106
1107    #[test]
1108    // Tests actual values of hashes, which are different if forcing collisions
1109    #[cfg(not(feature = "force_hash_collisions"))]
1110    fn create_hashes_for_list_arrays() {
1111        let data = vec![
1112            Some(vec![Some(0), Some(1), Some(2)]),
1113            None,
1114            Some(vec![Some(3), None, Some(5)]),
1115            Some(vec![Some(3), None, Some(5)]),
1116            None,
1117            Some(vec![Some(0), Some(1), Some(2)]),
1118            Some(vec![]),
1119        ];
1120        let list_array =
1121            Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
1122        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1123        let mut hashes = vec![0; list_array.len()];
1124        create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
1125        assert_eq!(hashes[0], hashes[5]);
1126        assert_eq!(hashes[1], hashes[4]);
1127        assert_eq!(hashes[2], hashes[3]);
1128        assert_eq!(hashes[1], hashes[6]); // null vs empty list
1129    }
1130
1131    #[test]
1132    // Tests actual values of hashes, which are different if forcing collisions
1133    #[cfg(not(feature = "force_hash_collisions"))]
1134    fn create_hashes_for_fixed_size_list_arrays() {
1135        let data = vec![
1136            Some(vec![Some(0), Some(1), Some(2)]),
1137            None,
1138            Some(vec![Some(3), None, Some(5)]),
1139            Some(vec![Some(3), None, Some(5)]),
1140            None,
1141            Some(vec![Some(0), Some(1), Some(2)]),
1142        ];
1143        let list_array =
1144            Arc::new(FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
1145                data, 3,
1146            )) as ArrayRef;
1147        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1148        let mut hashes = vec![0; list_array.len()];
1149        create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
1150        assert_eq!(hashes[0], hashes[5]);
1151        assert_eq!(hashes[1], hashes[4]);
1152        assert_eq!(hashes[2], hashes[3]);
1153    }
1154
1155    #[test]
1156    // Tests actual values of hashes, which are different if forcing collisions
1157    #[cfg(not(feature = "force_hash_collisions"))]
1158    fn create_hashes_for_struct_arrays() {
1159        use arrow::buffer::Buffer;
1160
1161        let boolarr = Arc::new(BooleanArray::from(vec![
1162            false, false, true, true, true, true,
1163        ]));
1164        let i32arr = Arc::new(Int32Array::from(vec![10, 10, 20, 20, 30, 31]));
1165
1166        let struct_array = StructArray::from((
1167            vec![
1168                (
1169                    Arc::new(Field::new("bool", DataType::Boolean, false)),
1170                    Arc::clone(&boolarr) as ArrayRef,
1171                ),
1172                (
1173                    Arc::new(Field::new("i32", DataType::Int32, false)),
1174                    Arc::clone(&i32arr) as ArrayRef,
1175                ),
1176                (
1177                    Arc::new(Field::new("i32", DataType::Int32, false)),
1178                    Arc::clone(&i32arr) as ArrayRef,
1179                ),
1180                (
1181                    Arc::new(Field::new("bool", DataType::Boolean, false)),
1182                    Arc::clone(&boolarr) as ArrayRef,
1183                ),
1184            ],
1185            Buffer::from(&[0b001011]),
1186        ));
1187
1188        assert!(struct_array.is_valid(0));
1189        assert!(struct_array.is_valid(1));
1190        assert!(struct_array.is_null(2));
1191        assert!(struct_array.is_valid(3));
1192        assert!(struct_array.is_null(4));
1193        assert!(struct_array.is_null(5));
1194
1195        let array = Arc::new(struct_array) as ArrayRef;
1196
1197        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1198        let mut hashes = vec![0; array.len()];
1199        create_hashes(&[array], &random_state, &mut hashes).unwrap();
1200        assert_eq!(hashes[0], hashes[1]);
1201        // same value but the third row ( hashes[2] ) is null
1202        assert_ne!(hashes[2], hashes[3]);
1203        // different values but both are null
1204        assert_eq!(hashes[4], hashes[5]);
1205    }
1206
1207    #[test]
1208    // Tests actual values of hashes, which are different if forcing collisions
1209    #[cfg(not(feature = "force_hash_collisions"))]
1210    fn create_hashes_for_struct_arrays_more_column_than_row() {
1211        let struct_array = StructArray::from(vec![
1212            (
1213                Arc::new(Field::new("bool", DataType::Boolean, false)),
1214                Arc::new(BooleanArray::from(vec![false, false])) as ArrayRef,
1215            ),
1216            (
1217                Arc::new(Field::new("i32-1", DataType::Int32, false)),
1218                Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
1219            ),
1220            (
1221                Arc::new(Field::new("i32-2", DataType::Int32, false)),
1222                Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
1223            ),
1224            (
1225                Arc::new(Field::new("i32-3", DataType::Int32, false)),
1226                Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
1227            ),
1228        ]);
1229
1230        assert!(struct_array.is_valid(0));
1231        assert!(struct_array.is_valid(1));
1232
1233        let array = Arc::new(struct_array) as ArrayRef;
1234        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1235        let mut hashes = vec![0; array.len()];
1236        create_hashes(&[array], &random_state, &mut hashes).unwrap();
1237        assert_eq!(hashes[0], hashes[1]);
1238    }
1239
1240    #[test]
1241    // Tests actual values of hashes, which are different if forcing collisions
1242    #[cfg(not(feature = "force_hash_collisions"))]
1243    fn create_hashes_for_map_arrays() {
1244        let mut builder =
1245            MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
1246        // Row 0
1247        builder.keys().append_value("key1");
1248        builder.keys().append_value("key2");
1249        builder.values().append_value(1);
1250        builder.values().append_value(2);
1251        builder.append(true).unwrap();
1252        // Row 1
1253        builder.keys().append_value("key1");
1254        builder.keys().append_value("key2");
1255        builder.values().append_value(1);
1256        builder.values().append_value(2);
1257        builder.append(true).unwrap();
1258        // Row 2
1259        builder.keys().append_value("key1");
1260        builder.keys().append_value("key2");
1261        builder.values().append_value(1);
1262        builder.values().append_value(3);
1263        builder.append(true).unwrap();
1264        // Row 3
1265        builder.keys().append_value("key1");
1266        builder.keys().append_value("key3");
1267        builder.values().append_value(1);
1268        builder.values().append_value(2);
1269        builder.append(true).unwrap();
1270        // Row 4
1271        builder.keys().append_value("key1");
1272        builder.values().append_value(1);
1273        builder.append(true).unwrap();
1274        // Row 5
1275        builder.keys().append_value("key1");
1276        builder.values().append_null();
1277        builder.append(true).unwrap();
1278        // Row 6
1279        builder.append(true).unwrap();
1280        // Row 7
1281        builder.keys().append_value("key1");
1282        builder.values().append_value(1);
1283        builder.append(false).unwrap();
1284
1285        let array = Arc::new(builder.finish()) as ArrayRef;
1286
1287        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1288        let mut hashes = vec![0; array.len()];
1289        create_hashes(&[array], &random_state, &mut hashes).unwrap();
1290        assert_eq!(hashes[0], hashes[1]); // same value
1291        assert_ne!(hashes[0], hashes[2]); // different value
1292        assert_ne!(hashes[0], hashes[3]); // different key
1293        assert_ne!(hashes[0], hashes[4]); // missing an entry
1294        assert_ne!(hashes[4], hashes[5]); // filled vs null value
1295        assert_eq!(hashes[6], hashes[7]); // empty vs null map
1296    }
1297
1298    #[test]
1299    // Tests actual values of hashes, which are different if forcing collisions
1300    #[cfg(not(feature = "force_hash_collisions"))]
1301    fn create_multi_column_hash_for_dict_arrays() {
1302        let strings1 = [Some("foo"), None, Some("bar")];
1303        let strings2 = [Some("blarg"), Some("blah"), None];
1304
1305        let string_array: ArrayRef =
1306            Arc::new(strings1.iter().cloned().collect::<StringArray>());
1307        let dict_array: ArrayRef = Arc::new(
1308            strings2
1309                .iter()
1310                .cloned()
1311                .collect::<DictionaryArray<Int32Type>>(),
1312        );
1313
1314        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1315
1316        let mut one_col_hashes = vec![0; strings1.len()];
1317        create_hashes(
1318            &[Arc::clone(&dict_array) as ArrayRef],
1319            &random_state,
1320            &mut one_col_hashes,
1321        )
1322        .unwrap();
1323
1324        let mut two_col_hashes = vec![0; strings1.len()];
1325        create_hashes(
1326            &[dict_array, string_array],
1327            &random_state,
1328            &mut two_col_hashes,
1329        )
1330        .unwrap();
1331
1332        assert_eq!(one_col_hashes.len(), 3);
1333        assert_eq!(two_col_hashes.len(), 3);
1334
1335        assert_ne!(one_col_hashes, two_col_hashes);
1336    }
1337
1338    #[test]
1339    fn test_create_hashes_from_arrays() {
1340        let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1341        let float_array: ArrayRef =
1342            Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
1343
1344        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1345        let hashes_buff = &mut vec![0; int_array.len()];
1346        let hashes =
1347            create_hashes(&[int_array, float_array], &random_state, hashes_buff).unwrap();
1348        assert_eq!(hashes.len(), 4,);
1349    }
1350
1351    #[test]
1352    fn test_create_hashes_from_dyn_arrays() {
1353        let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1354        let float_array: ArrayRef =
1355            Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
1356
1357        // Verify that we can call create_hashes with only &dyn Array
1358        fn test(arr1: &dyn Array, arr2: &dyn Array) {
1359            let random_state = RandomState::with_seeds(0, 0, 0, 0);
1360            let hashes_buff = &mut vec![0; arr1.len()];
1361            let hashes = create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
1362            assert_eq!(hashes.len(), 4,);
1363        }
1364        test(&*int_array, &*float_array);
1365    }
1366
1367    #[test]
1368    fn test_create_hashes_equivalence() {
1369        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1370        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1371
1372        let mut hashes1 = vec![0; array.len()];
1373        create_hashes(
1374            &[Arc::clone(&array) as ArrayRef],
1375            &random_state,
1376            &mut hashes1,
1377        )
1378        .unwrap();
1379
1380        let mut hashes2 = vec![0; array.len()];
1381        create_hashes([array], &random_state, &mut hashes2).unwrap();
1382
1383        assert_eq!(hashes1, hashes2);
1384    }
1385
1386    #[test]
1387    fn test_with_hashes() {
1388        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1389        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1390
1391        // Test that with_hashes produces the same results as create_hashes
1392        let mut expected_hashes = vec![0; array.len()];
1393        create_hashes([&array], &random_state, &mut expected_hashes).unwrap();
1394
1395        let result = with_hashes([&array], &random_state, |hashes| {
1396            assert_eq!(hashes.len(), 4);
1397            // Verify hashes match expected values
1398            assert_eq!(hashes, &expected_hashes[..]);
1399            // Return a copy of the hashes
1400            Ok(hashes.to_vec())
1401        })
1402        .unwrap();
1403
1404        // Verify callback result is returned correctly
1405        assert_eq!(result, expected_hashes);
1406    }
1407
1408    #[test]
1409    fn test_with_hashes_multi_column() {
1410        let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1411        let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1412        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1413
1414        // Test multi-column hashing
1415        let mut expected_hashes = vec![0; int_array.len()];
1416        create_hashes(
1417            [&int_array, &str_array],
1418            &random_state,
1419            &mut expected_hashes,
1420        )
1421        .unwrap();
1422
1423        with_hashes([&int_array, &str_array], &random_state, |hashes| {
1424            assert_eq!(hashes.len(), 3);
1425            assert_eq!(hashes, &expected_hashes[..]);
1426            Ok(())
1427        })
1428        .unwrap();
1429    }
1430
1431    #[test]
1432    fn test_with_hashes_empty_arrays() {
1433        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1434
1435        // Test that passing no arrays returns an error
1436        let empty: [&ArrayRef; 0] = [];
1437        let result = with_hashes(empty, &random_state, |_hashes| Ok(()));
1438
1439        assert!(result.is_err());
1440        assert!(
1441            result
1442                .unwrap_err()
1443                .to_string()
1444                .contains("requires at least one array")
1445        );
1446    }
1447
1448    #[test]
1449    fn test_with_hashes_reentrancy() {
1450        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1451        let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
1452        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1453
1454        // Test that reentrant calls return an error instead of panicking
1455        let result = with_hashes([&array], &random_state, |_hashes| {
1456            // Try to call with_hashes again inside the callback
1457            with_hashes([&array2], &random_state, |_inner_hashes| Ok(()))
1458        });
1459
1460        assert!(result.is_err());
1461        let err_msg = result.unwrap_err().to_string();
1462        assert!(
1463            err_msg.contains("reentrantly") || err_msg.contains("cannot be called"),
1464            "Error message should mention reentrancy: {err_msg}",
1465        );
1466    }
1467
1468    #[test]
1469    #[cfg(not(feature = "force_hash_collisions"))]
1470    fn create_hashes_for_sparse_union_arrays() {
1471        // logical array: [int(5), str("foo"), int(10), int(5)]
1472        let int_array = Int32Array::from(vec![Some(5), None, Some(10), Some(5)]);
1473        let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
1474
1475        let type_ids = vec![0_i8, 1, 0, 0].into();
1476        let children = vec![
1477            Arc::new(int_array) as ArrayRef,
1478            Arc::new(str_array) as ArrayRef,
1479        ];
1480
1481        let union_fields = [
1482            (0, Arc::new(Field::new("a", DataType::Int32, true))),
1483            (1, Arc::new(Field::new("b", DataType::Utf8, true))),
1484        ]
1485        .into_iter()
1486        .collect();
1487
1488        let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
1489        let array_ref = Arc::new(array) as ArrayRef;
1490
1491        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1492        let mut hashes = vec![0; array_ref.len()];
1493        create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1494
1495        // Rows 0 and 3 both have type_id=0 (int) with value 5
1496        assert_eq!(hashes[0], hashes[3]);
1497        // Row 0 (int 5) vs Row 2 (int 10) - different values
1498        assert_ne!(hashes[0], hashes[2]);
1499        // Row 0 (int) vs Row 1 (string) - different types
1500        assert_ne!(hashes[0], hashes[1]);
1501    }
1502
1503    #[test]
1504    #[cfg(not(feature = "force_hash_collisions"))]
1505    fn create_hashes_for_sparse_union_arrays_with_nulls() {
1506        // logical array: [int(5), str("foo"), int(null), str(null)]
1507        let int_array = Int32Array::from(vec![Some(5), None, None, None]);
1508        let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
1509
1510        let type_ids = vec![0, 1, 0, 1].into();
1511        let children = vec![
1512            Arc::new(int_array) as ArrayRef,
1513            Arc::new(str_array) as ArrayRef,
1514        ];
1515
1516        let union_fields = [
1517            (0, Arc::new(Field::new("a", DataType::Int32, true))),
1518            (1, Arc::new(Field::new("b", DataType::Utf8, true))),
1519        ]
1520        .into_iter()
1521        .collect();
1522
1523        let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
1524        let array_ref = Arc::new(array) as ArrayRef;
1525
1526        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1527        let mut hashes = vec![0; array_ref.len()];
1528        create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1529
1530        // row 2 (int null) and row 3 (str null) should have the same hash
1531        // because they are both null values
1532        assert_eq!(hashes[2], hashes[3]);
1533
1534        // row 0 (int 5) vs row 2 (int null) - different (value vs null)
1535        assert_ne!(hashes[0], hashes[2]);
1536
1537        // row 1 (str "foo") vs row 3 (str null) - different (value vs null)
1538        assert_ne!(hashes[1], hashes[3]);
1539    }
1540
1541    #[test]
1542    #[cfg(not(feature = "force_hash_collisions"))]
1543    fn create_hashes_for_dense_union_arrays() {
1544        // creates a dense union array with int and string types
1545        // [67, "norm", 100, "macdonald", 67]
1546        let int_array = Int32Array::from(vec![67, 100, 67]);
1547        let str_array = StringArray::from(vec!["norm", "macdonald"]);
1548
1549        let type_ids = vec![0, 1, 0, 1, 0].into();
1550        let offsets = vec![0, 0, 1, 1, 2].into();
1551        let children = vec![
1552            Arc::new(int_array) as ArrayRef,
1553            Arc::new(str_array) as ArrayRef,
1554        ];
1555
1556        let union_fields = [
1557            (0, Arc::new(Field::new("a", DataType::Int32, false))),
1558            (1, Arc::new(Field::new("b", DataType::Utf8, false))),
1559        ]
1560        .into_iter()
1561        .collect();
1562
1563        let array =
1564            UnionArray::try_new(union_fields, type_ids, Some(offsets), children).unwrap();
1565        let array_ref = Arc::new(array) as ArrayRef;
1566
1567        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1568        let mut hashes = vec![0; array_ref.len()];
1569        create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1570
1571        // 67 vs "norm"
1572        assert_ne!(hashes[0], hashes[1]);
1573        // 67 vs 100
1574        assert_ne!(hashes[0], hashes[2]);
1575        // "norm" vs "macdonald"
1576        assert_ne!(hashes[1], hashes[3]);
1577        // 100 vs "macdonald"
1578        assert_ne!(hashes[2], hashes[3]);
1579        // 67 vs 67
1580        assert_eq!(hashes[0], hashes[4]);
1581    }
1582
1583    #[test]
1584    #[cfg(not(feature = "force_hash_collisions"))]
1585    fn create_hashes_for_sliced_run_array() -> Result<()> {
1586        let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
1587        let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1588        let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1589
1590        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1591        let mut full_hashes = vec![0; array.len()];
1592        create_hashes(
1593            &[Arc::clone(&array) as ArrayRef],
1594            &random_state,
1595            &mut full_hashes,
1596        )?;
1597
1598        let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
1599        let sliced_array = array_ref.slice(2, 3);
1600
1601        let mut sliced_hashes = vec![0; sliced_array.len()];
1602        create_hashes(
1603            std::slice::from_ref(&sliced_array),
1604            &random_state,
1605            &mut sliced_hashes,
1606        )?;
1607
1608        assert_eq!(sliced_hashes.len(), 3);
1609        assert_eq!(sliced_hashes[0], sliced_hashes[1]);
1610        assert_eq!(sliced_hashes[1], sliced_hashes[2]);
1611        assert_eq!(&sliced_hashes, &full_hashes[2..5]);
1612
1613        Ok(())
1614    }
1615
1616    #[test]
1617    #[cfg(not(feature = "force_hash_collisions"))]
1618    fn test_run_array_with_nulls() -> Result<()> {
1619        let values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1620        let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
1621        let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1622
1623        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1624        let mut hashes = vec![0; array.len()];
1625        create_hashes(
1626            &[Arc::clone(&array) as ArrayRef],
1627            &random_state,
1628            &mut hashes,
1629        )?;
1630
1631        assert_eq!(hashes[0], hashes[1]);
1632        assert_ne!(hashes[0], 0);
1633        assert_eq!(hashes[2], hashes[3]);
1634        assert_eq!(hashes[2], 0);
1635        assert_eq!(hashes[4], hashes[5]);
1636        assert_ne!(hashes[4], 0);
1637        assert_ne!(hashes[0], hashes[4]);
1638
1639        Ok(())
1640    }
1641
1642    #[test]
1643    #[cfg(not(feature = "force_hash_collisions"))]
1644    fn test_run_array_with_nulls_multicolumn() -> Result<()> {
1645        let primitive_array = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1646        let run_values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1647        let run_ends = Arc::new(Int32Array::from(vec![1, 2, 3]));
1648        let run_array =
1649            Arc::new(RunArray::try_new(&run_ends, run_values.as_ref()).unwrap());
1650        let second_col = Arc::new(Int32Array::from(vec![100, 200, 300]));
1651
1652        let random_state = RandomState::with_seeds(0, 0, 0, 0);
1653
1654        let mut primitive_hashes = vec![0; 3];
1655        create_hashes(
1656            &[
1657                Arc::clone(&primitive_array) as ArrayRef,
1658                Arc::clone(&second_col) as ArrayRef,
1659            ],
1660            &random_state,
1661            &mut primitive_hashes,
1662        )?;
1663
1664        let mut run_hashes = vec![0; 3];
1665        create_hashes(
1666            &[
1667                Arc::clone(&run_array) as ArrayRef,
1668                Arc::clone(&second_col) as ArrayRef,
1669            ],
1670            &random_state,
1671            &mut run_hashes,
1672        )?;
1673
1674        assert_eq!(primitive_hashes, run_hashes);
1675
1676        Ok(())
1677    }
1678}