1use ahash::RandomState;
21use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
22use arrow::array::*;
23use arrow::compute::take;
24use arrow::datatypes::*;
25#[cfg(not(feature = "force_hash_collisions"))]
26use arrow::{downcast_dictionary_array, downcast_primitive_array};
27use itertools::Itertools;
28use std::collections::HashMap;
29
30#[cfg(not(feature = "force_hash_collisions"))]
31use crate::cast::{
32 as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
33 as_generic_binary_array, as_large_list_array, as_large_list_view_array,
34 as_list_array, as_list_view_array, as_map_array, as_string_array,
35 as_string_view_array, as_struct_array, as_union_array,
36};
37use crate::error::Result;
38use crate::error::{_internal_datafusion_err, _internal_err};
39use std::cell::RefCell;
40
41#[inline]
43pub fn combine_hashes(l: u64, r: u64) -> u64 {
44 let hash = (17 * 37u64).wrapping_add(l);
45 hash.wrapping_mul(37).wrapping_add(r)
46}
47
48const MAX_BUFFER_SIZE: usize = 524_288;
53
54thread_local! {
55 static HASH_BUFFER: RefCell<Vec<u64>> = const { RefCell::new(Vec::new()) };
60}
61
62pub fn with_hashes<I, T, F, R>(
94 arrays: I,
95 random_state: &RandomState,
96 callback: F,
97) -> Result<R>
98where
99 I: IntoIterator<Item = T>,
100 T: AsDynArray,
101 F: FnOnce(&[u64]) -> Result<R>,
102{
103 let mut iter = arrays.into_iter().peekable();
105
106 let required_size = match iter.peek() {
108 Some(arr) => arr.as_dyn_array().len(),
109 None => return _internal_err!("with_hashes requires at least one array"),
110 };
111
112 HASH_BUFFER.try_with(|cell| {
113 let mut buffer = cell.try_borrow_mut()
114 .map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?;
115
116 buffer.clear();
118 buffer.resize(required_size, 0);
119
120 create_hashes(iter, random_state, &mut buffer[..required_size])?;
122
123 let result = callback(&buffer[..required_size])?;
125
126 if buffer.capacity() > MAX_BUFFER_SIZE {
128 buffer.truncate(MAX_BUFFER_SIZE);
129 buffer.shrink_to_fit();
130 }
131
132 Ok(result)
133 }).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))?
134}
135
136#[cfg(not(feature = "force_hash_collisions"))]
137fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
138 if mul_col {
139 hashes_buffer.iter_mut().for_each(|hash| {
140 *hash = combine_hashes(random_state.hash_one(1), *hash);
142 })
143 } else {
144 hashes_buffer.iter_mut().for_each(|hash| {
145 *hash = random_state.hash_one(1);
146 })
147 }
148}
149
150pub trait HashValue {
151 fn hash_one(&self, state: &RandomState) -> u64;
152}
153
154impl<T: HashValue + ?Sized> HashValue for &T {
155 fn hash_one(&self, state: &RandomState) -> u64 {
156 T::hash_one(self, state)
157 }
158}
159
160macro_rules! hash_value {
161 ($($t:ty),+) => {
162 $(impl HashValue for $t {
163 fn hash_one(&self, state: &RandomState) -> u64 {
164 state.hash_one(self)
165 }
166 })+
167 };
168}
169hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64, u128);
170hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano);
171
172macro_rules! hash_float_value {
173 ($(($t:ty, $i:ty)),+) => {
174 $(impl HashValue for $t {
175 fn hash_one(&self, state: &RandomState) -> u64 {
176 state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes()))
177 }
178 })+
179 };
180}
181hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
182
183#[cfg(not(feature = "force_hash_collisions"))]
187fn hash_array_primitive<T>(
188 array: &PrimitiveArray<T>,
189 random_state: &RandomState,
190 hashes_buffer: &mut [u64],
191 rehash: bool,
192) where
193 T: ArrowPrimitiveType<Native: HashValue>,
194{
195 assert_eq!(
196 hashes_buffer.len(),
197 array.len(),
198 "hashes_buffer and array should be of equal length"
199 );
200
201 if array.null_count() == 0 {
202 if rehash {
203 for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) {
204 *hash = combine_hashes(value.hash_one(random_state), *hash);
205 }
206 } else {
207 for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) {
208 *hash = value.hash_one(random_state);
209 }
210 }
211 } else if rehash {
212 for (i, hash) in hashes_buffer.iter_mut().enumerate() {
213 if !array.is_null(i) {
214 let value = unsafe { array.value_unchecked(i) };
215 *hash = combine_hashes(value.hash_one(random_state), *hash);
216 }
217 }
218 } else {
219 for (i, hash) in hashes_buffer.iter_mut().enumerate() {
220 if !array.is_null(i) {
221 let value = unsafe { array.value_unchecked(i) };
222 *hash = value.hash_one(random_state);
223 }
224 }
225 }
226}
227
228#[cfg(not(feature = "force_hash_collisions"))]
232fn hash_array<T>(
233 array: &T,
234 random_state: &RandomState,
235 hashes_buffer: &mut [u64],
236 rehash: bool,
237) where
238 T: ArrayAccessor,
239 T::Item: HashValue,
240{
241 assert_eq!(
242 hashes_buffer.len(),
243 array.len(),
244 "hashes_buffer and array should be of equal length"
245 );
246
247 if array.null_count() == 0 {
248 if rehash {
249 for (i, hash) in hashes_buffer.iter_mut().enumerate() {
250 let value = unsafe { array.value_unchecked(i) };
251 *hash = combine_hashes(value.hash_one(random_state), *hash);
252 }
253 } else {
254 for (i, hash) in hashes_buffer.iter_mut().enumerate() {
255 let value = unsafe { array.value_unchecked(i) };
256 *hash = value.hash_one(random_state);
257 }
258 }
259 } else if rehash {
260 for (i, hash) in hashes_buffer.iter_mut().enumerate() {
261 if !array.is_null(i) {
262 let value = unsafe { array.value_unchecked(i) };
263 *hash = combine_hashes(value.hash_one(random_state), *hash);
264 }
265 }
266 } else {
267 for (i, hash) in hashes_buffer.iter_mut().enumerate() {
268 if !array.is_null(i) {
269 let value = unsafe { array.value_unchecked(i) };
270 *hash = value.hash_one(random_state);
271 }
272 }
273 }
274}
275
276#[inline(never)]
284fn hash_string_view_array_inner<
285 T: ByteViewType,
286 const HAS_NULLS: bool,
287 const HAS_BUFFERS: bool,
288 const REHASH: bool,
289>(
290 array: &GenericByteViewArray<T>,
291 random_state: &RandomState,
292 hashes_buffer: &mut [u64],
293) {
294 assert_eq!(
295 hashes_buffer.len(),
296 array.len(),
297 "hashes_buffer and array should be of equal length"
298 );
299
300 let buffers = array.data_buffers();
301 let view_bytes = |view_len: u32, view: u128| {
302 let view = ByteView::from(view);
303 let offset = view.offset as usize;
304 unsafe {
306 let data = buffers.get_unchecked(view.buffer_index as usize);
307 data.get_unchecked(offset..offset + view_len as usize)
308 }
309 };
310
311 let hashes_and_views = hashes_buffer.iter_mut().zip(array.views().iter());
312 for (i, (hash, &v)) in hashes_and_views.enumerate() {
313 if HAS_NULLS && array.is_null(i) {
314 continue;
315 }
316 let view_len = v as u32;
317 if !HAS_BUFFERS || view_len <= 12 {
319 if REHASH {
320 *hash = combine_hashes(v.hash_one(random_state), *hash);
321 } else {
322 *hash = v.hash_one(random_state);
323 }
324 continue;
325 }
326 let value = view_bytes(view_len, v);
328 if REHASH {
329 *hash = combine_hashes(value.hash_one(random_state), *hash);
330 } else {
331 *hash = value.hash_one(random_state);
332 }
333 }
334}
335
336#[cfg(not(feature = "force_hash_collisions"))]
340fn hash_generic_byte_view_array<T: ByteViewType>(
341 array: &GenericByteViewArray<T>,
342 random_state: &RandomState,
343 hashes_buffer: &mut [u64],
344 rehash: bool,
345) {
346 match (
348 array.null_count() != 0,
349 !array.data_buffers().is_empty(),
350 rehash,
351 ) {
352 (false, false, false) => {
355 for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) {
356 *hash = view.hash_one(random_state);
357 }
358 }
359 (false, false, true) => {
360 for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) {
361 *hash = combine_hashes(view.hash_one(random_state), *hash);
362 }
363 }
364 (false, true, false) => hash_string_view_array_inner::<T, false, true, false>(
365 array,
366 random_state,
367 hashes_buffer,
368 ),
369 (false, true, true) => hash_string_view_array_inner::<T, false, true, true>(
370 array,
371 random_state,
372 hashes_buffer,
373 ),
374 (true, false, false) => hash_string_view_array_inner::<T, true, false, false>(
375 array,
376 random_state,
377 hashes_buffer,
378 ),
379 (true, false, true) => hash_string_view_array_inner::<T, true, false, true>(
380 array,
381 random_state,
382 hashes_buffer,
383 ),
384 (true, true, false) => hash_string_view_array_inner::<T, true, true, false>(
385 array,
386 random_state,
387 hashes_buffer,
388 ),
389 (true, true, true) => hash_string_view_array_inner::<T, true, true, true>(
390 array,
391 random_state,
392 hashes_buffer,
393 ),
394 }
395}
396
397#[inline(never)]
404fn hash_dictionary_inner<
405 K: ArrowDictionaryKeyType,
406 const HAS_NULL_KEYS: bool,
407 const HAS_NULL_VALUES: bool,
408 const MULTI_COL: bool,
409>(
410 array: &DictionaryArray<K>,
411 random_state: &RandomState,
412 hashes_buffer: &mut [u64],
413) -> Result<()> {
414 let dict_values = array.values();
418 let mut dict_hashes = vec![0; dict_values.len()];
419 create_hashes([dict_values], random_state, &mut dict_hashes)?;
420
421 if HAS_NULL_KEYS {
422 for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
423 if let Some(key) = key {
424 let idx = key.as_usize();
425 if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
426 if MULTI_COL {
427 *hash = combine_hashes(dict_hashes[idx], *hash);
428 } else {
429 *hash = dict_hashes[idx];
430 }
431 }
432 }
433 }
434 } else {
435 for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().values()) {
436 let idx = key.as_usize();
437 if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
438 if MULTI_COL {
439 *hash = combine_hashes(dict_hashes[idx], *hash);
440 } else {
441 *hash = dict_hashes[idx];
442 }
443 }
444 }
445 }
446 Ok(())
447}
448
449#[cfg(not(feature = "force_hash_collisions"))]
451fn hash_dictionary<K: ArrowDictionaryKeyType>(
452 array: &DictionaryArray<K>,
453 random_state: &RandomState,
454 hashes_buffer: &mut [u64],
455 multi_col: bool,
456) -> Result<()> {
457 let has_null_keys = array.keys().null_count() != 0;
458 let has_null_values = array.values().null_count() != 0;
459
460 match (has_null_keys, has_null_values, multi_col) {
463 (false, false, false) => hash_dictionary_inner::<K, false, false, false>(
464 array,
465 random_state,
466 hashes_buffer,
467 ),
468 (false, false, true) => hash_dictionary_inner::<K, false, false, true>(
469 array,
470 random_state,
471 hashes_buffer,
472 ),
473 (false, true, false) => hash_dictionary_inner::<K, false, true, false>(
474 array,
475 random_state,
476 hashes_buffer,
477 ),
478 (false, true, true) => hash_dictionary_inner::<K, false, true, true>(
479 array,
480 random_state,
481 hashes_buffer,
482 ),
483 (true, false, false) => hash_dictionary_inner::<K, true, false, false>(
484 array,
485 random_state,
486 hashes_buffer,
487 ),
488 (true, false, true) => hash_dictionary_inner::<K, true, false, true>(
489 array,
490 random_state,
491 hashes_buffer,
492 ),
493 (true, true, false) => hash_dictionary_inner::<K, true, true, false>(
494 array,
495 random_state,
496 hashes_buffer,
497 ),
498 (true, true, true) => hash_dictionary_inner::<K, true, true, true>(
499 array,
500 random_state,
501 hashes_buffer,
502 ),
503 }
504}
505
506#[cfg(not(feature = "force_hash_collisions"))]
507fn hash_struct_array(
508 array: &StructArray,
509 random_state: &RandomState,
510 hashes_buffer: &mut [u64],
511) -> Result<()> {
512 let nulls = array.nulls();
513 let row_len = array.len();
514
515 let mut values_hashes = vec![0u64; row_len];
517 create_hashes(array.columns(), random_state, &mut values_hashes)?;
518
519 if let Some(nulls) = nulls {
521 for i in nulls.valid_indices() {
522 let hash = &mut hashes_buffer[i];
523 *hash = combine_hashes(*hash, values_hashes[i]);
524 }
525 } else {
526 for i in 0..row_len {
527 let hash = &mut hashes_buffer[i];
528 *hash = combine_hashes(*hash, values_hashes[i]);
529 }
530 }
531
532 Ok(())
533}
534
535#[cfg(not(feature = "force_hash_collisions"))]
537fn hash_map_array(
538 array: &MapArray,
539 random_state: &RandomState,
540 hashes_buffer: &mut [u64],
541) -> Result<()> {
542 let nulls = array.nulls();
543 let offsets = array.offsets();
544
545 let first_offset = offsets.first().copied().unwrap_or_default() as usize;
547 let last_offset = offsets.last().copied().unwrap_or_default() as usize;
548 let entries_len = last_offset - first_offset;
549
550 let mut values_hashes = vec![0u64; entries_len];
552 let entries = array.entries();
553 let sliced_columns: Vec<ArrayRef> = entries
554 .columns()
555 .iter()
556 .map(|col| col.slice(first_offset, entries_len))
557 .collect();
558 create_hashes(&sliced_columns, random_state, &mut values_hashes)?;
559
560 if let Some(nulls) = nulls {
563 for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
564 if nulls.is_valid(i) {
565 let hash = &mut hashes_buffer[i];
566 for values_hash in &values_hashes
567 [start.as_usize() - first_offset..stop.as_usize() - first_offset]
568 {
569 *hash = combine_hashes(*hash, *values_hash);
570 }
571 }
572 }
573 } else {
574 for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
575 let hash = &mut hashes_buffer[i];
576 for values_hash in &values_hashes
577 [start.as_usize() - first_offset..stop.as_usize() - first_offset]
578 {
579 *hash = combine_hashes(*hash, *values_hash);
580 }
581 }
582 }
583
584 Ok(())
585}
586
587#[cfg(not(feature = "force_hash_collisions"))]
588fn hash_list_array<OffsetSize>(
589 array: &GenericListArray<OffsetSize>,
590 random_state: &RandomState,
591 hashes_buffer: &mut [u64],
592) -> Result<()>
593where
594 OffsetSize: OffsetSizeTrait,
595{
596 let first_offset = array.value_offsets().first().cloned().unwrap_or_default();
598 let last_offset = array.value_offsets().last().cloned().unwrap_or_default();
599 let value_bytes_len = (last_offset - first_offset).as_usize();
600 let mut values_hashes = vec![0u64; value_bytes_len];
601 create_hashes(
602 [array
603 .values()
604 .slice(first_offset.as_usize(), value_bytes_len)],
605 random_state,
606 &mut values_hashes,
607 )?;
608
609 if array.null_count() > 0 {
610 for (i, (start, stop)) in array.value_offsets().iter().tuple_windows().enumerate()
611 {
612 if array.is_valid(i) {
613 let hash = &mut hashes_buffer[i];
614 for values_hash in &values_hashes[(*start - first_offset).as_usize()
615 ..(*stop - first_offset).as_usize()]
616 {
617 *hash = combine_hashes(*hash, *values_hash);
618 }
619 }
620 }
621 } else {
622 for ((start, stop), hash) in array
623 .value_offsets()
624 .iter()
625 .tuple_windows()
626 .zip(hashes_buffer.iter_mut())
627 {
628 for values_hash in &values_hashes
629 [(*start - first_offset).as_usize()..(*stop - first_offset).as_usize()]
630 {
631 *hash = combine_hashes(*hash, *values_hash);
632 }
633 }
634 }
635 Ok(())
636}
637
638#[cfg(not(feature = "force_hash_collisions"))]
639fn hash_list_view_array<OffsetSize>(
640 array: &GenericListViewArray<OffsetSize>,
641 random_state: &RandomState,
642 hashes_buffer: &mut [u64],
643) -> Result<()>
644where
645 OffsetSize: OffsetSizeTrait,
646{
647 let values = array.values();
648 let offsets = array.value_offsets();
649 let sizes = array.value_sizes();
650 let nulls = array.nulls();
651 let mut values_hashes = vec![0u64; values.len()];
652 create_hashes([values], random_state, &mut values_hashes)?;
653 if let Some(nulls) = nulls {
654 for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
655 if nulls.is_valid(i) {
656 let hash = &mut hashes_buffer[i];
657 let start = offset.as_usize();
658 let end = start + size.as_usize();
659 for values_hash in &values_hashes[start..end] {
660 *hash = combine_hashes(*hash, *values_hash);
661 }
662 }
663 }
664 } else {
665 for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
666 let hash = &mut hashes_buffer[i];
667 let start = offset.as_usize();
668 let end = start + size.as_usize();
669 for values_hash in &values_hashes[start..end] {
670 *hash = combine_hashes(*hash, *values_hash);
671 }
672 }
673 }
674 Ok(())
675}
676
677#[cfg(not(feature = "force_hash_collisions"))]
678fn hash_union_array(
679 array: &UnionArray,
680 random_state: &RandomState,
681 hashes_buffer: &mut [u64],
682) -> Result<()> {
683 let DataType::Union(union_fields, _mode) = array.data_type() else {
684 unreachable!()
685 };
686
687 if array.is_dense() {
688 hash_union_array_default(array, union_fields, random_state, hashes_buffer)
691 } else {
692 hash_sparse_union_array(array, union_fields, random_state, hashes_buffer)
696 }
697}
698
699#[cfg(not(feature = "force_hash_collisions"))]
709fn hash_union_array_default(
710 array: &UnionArray,
711 union_fields: &UnionFields,
712 random_state: &RandomState,
713 hashes_buffer: &mut [u64],
714) -> Result<()> {
715 let mut child_hashes: HashMap<i8, Vec<u64>> =
716 HashMap::with_capacity(union_fields.len());
717
718 for (type_id, _field) in union_fields.iter() {
720 let child = array.child(type_id);
721 let mut child_hash_buffer = vec![0; child.len()];
722 create_hashes([child], random_state, &mut child_hash_buffer)?;
723
724 child_hashes.insert(type_id, child_hash_buffer);
725 }
726
727 #[expect(clippy::needless_range_loop)]
731 for i in 0..array.len() {
732 let type_id = array.type_id(i);
733 let child_offset = array.value_offset(i);
734
735 let child_hash = child_hashes.get(&type_id).expect("invalid type_id");
736 hashes_buffer[i] = combine_hashes(hashes_buffer[i], child_hash[child_offset]);
737 }
738
739 Ok(())
740}
741
742#[cfg(not(feature = "force_hash_collisions"))]
750fn hash_sparse_union_array(
751 array: &UnionArray,
752 union_fields: &UnionFields,
753 random_state: &RandomState,
754 hashes_buffer: &mut [u64],
755) -> Result<()> {
756 use std::collections::HashMap;
757
758 if union_fields.len() <= 2 {
761 return hash_union_array_default(
762 array,
763 union_fields,
764 random_state,
765 hashes_buffer,
766 );
767 }
768
769 let type_ids = array.type_ids();
770
771 let mut indices_by_type: HashMap<i8, Vec<u32>> = HashMap::new();
773 for (i, &type_id) in type_ids.iter().enumerate() {
774 indices_by_type.entry(type_id).or_default().push(i as u32);
775 }
776
777 for (type_id, _field) in union_fields.iter() {
779 if let Some(indices) = indices_by_type.get(&type_id) {
780 if indices.is_empty() {
781 continue;
782 }
783
784 let child = array.child(type_id);
785 let indices_array = UInt32Array::from(indices.clone());
786
787 let filtered = take(child.as_ref(), &indices_array, None)?;
789
790 let mut filtered_hashes = vec![0u64; filtered.len()];
792 create_hashes([&filtered], random_state, &mut filtered_hashes)?;
793
794 for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) {
796 hashes_buffer[idx as usize] =
797 combine_hashes(hashes_buffer[idx as usize], *hash);
798 }
799 }
800 }
801
802 Ok(())
803}
804
805#[cfg(not(feature = "force_hash_collisions"))]
806fn hash_fixed_list_array(
807 array: &FixedSizeListArray,
808 random_state: &RandomState,
809 hashes_buffer: &mut [u64],
810) -> Result<()> {
811 let values = array.values();
812 let value_length = array.value_length() as usize;
813 let nulls = array.nulls();
814 let mut values_hashes = vec![0u64; values.len()];
815 create_hashes([values], random_state, &mut values_hashes)?;
816 if let Some(nulls) = nulls {
817 for i in 0..array.len() {
818 if nulls.is_valid(i) {
819 let hash = &mut hashes_buffer[i];
820 for values_hash in
821 &values_hashes[i * value_length..(i + 1) * value_length]
822 {
823 *hash = combine_hashes(*hash, *values_hash);
824 }
825 }
826 }
827 } else {
828 for i in 0..array.len() {
829 let hash = &mut hashes_buffer[i];
830 for values_hash in &values_hashes[i * value_length..(i + 1) * value_length] {
831 *hash = combine_hashes(*hash, *values_hash);
832 }
833 }
834 }
835 Ok(())
836}
837
838#[inline(never)]
840#[cfg(not(feature = "force_hash_collisions"))]
841fn hash_run_array_inner<
842 R: RunEndIndexType,
843 const HAS_NULL_VALUES: bool,
844 const REHASH: bool,
845>(
846 array: &RunArray<R>,
847 random_state: &RandomState,
848 hashes_buffer: &mut [u64],
849) -> Result<()> {
850 let array_offset = array.offset();
854 let array_len = array.len();
855
856 if array_len == 0 {
857 return Ok(());
858 }
859
860 let run_ends = array.run_ends();
861 let run_ends_values = run_ends.values();
862 let values = array.values();
863
864 let start_physical_index = array.get_start_physical_index();
865 let end_physical_index = array.get_end_physical_index() + 1;
868
869 let sliced_values = values.slice(
870 start_physical_index,
871 end_physical_index - start_physical_index,
872 );
873 let mut values_hashes = vec![0u64; sliced_values.len()];
874 create_hashes(
875 std::slice::from_ref(&sliced_values),
876 random_state,
877 &mut values_hashes,
878 )?;
879
880 let mut start_in_slice = 0;
881 for (adjusted_physical_index, &absolute_run_end) in run_ends_values
882 [start_physical_index..end_physical_index]
883 .iter()
884 .enumerate()
885 {
886 let absolute_run_end = absolute_run_end.as_usize();
887 let end_in_slice = (absolute_run_end - array_offset).min(array_len);
888
889 if HAS_NULL_VALUES && sliced_values.is_null(adjusted_physical_index) {
890 start_in_slice = end_in_slice;
891 continue;
892 }
893
894 let value_hash = values_hashes[adjusted_physical_index];
895 let run_slice = &mut hashes_buffer[start_in_slice..end_in_slice];
896
897 if REHASH {
898 for hash in run_slice.iter_mut() {
899 *hash = combine_hashes(value_hash, *hash);
900 }
901 } else {
902 run_slice.fill(value_hash);
903 }
904
905 start_in_slice = end_in_slice;
906 }
907
908 Ok(())
909}
910
911#[cfg(not(feature = "force_hash_collisions"))]
912fn hash_run_array<R: RunEndIndexType>(
913 array: &RunArray<R>,
914 random_state: &RandomState,
915 hashes_buffer: &mut [u64],
916 rehash: bool,
917) -> Result<()> {
918 let has_null_values = array.values().null_count() != 0;
919
920 match (has_null_values, rehash) {
921 (false, false) => {
922 hash_run_array_inner::<R, false, false>(array, random_state, hashes_buffer)
923 }
924 (false, true) => {
925 hash_run_array_inner::<R, false, true>(array, random_state, hashes_buffer)
926 }
927 (true, false) => {
928 hash_run_array_inner::<R, true, false>(array, random_state, hashes_buffer)
929 }
930 (true, true) => {
931 hash_run_array_inner::<R, true, true>(array, random_state, hashes_buffer)
932 }
933 }
934}
935
936#[cfg(not(feature = "force_hash_collisions"))]
939fn hash_single_array(
940 array: &dyn Array,
941 random_state: &RandomState,
942 hashes_buffer: &mut [u64],
943 rehash: bool,
944) -> Result<()> {
945 downcast_primitive_array! {
946 array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
947 DataType::Null => hash_null(random_state, hashes_buffer, rehash),
948 DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash),
949 DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash),
950 DataType::Utf8View => hash_generic_byte_view_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
951 DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash),
952 DataType::Binary => hash_array(&as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
953 DataType::BinaryView => hash_generic_byte_view_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
954 DataType::LargeBinary => hash_array(&as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
955 DataType::FixedSizeBinary(_) => {
956 let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
957 hash_array(&array, random_state, hashes_buffer, rehash)
958 }
959 DataType::Dictionary(_, _) => downcast_dictionary_array! {
960 array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
961 _ => unreachable!()
962 }
963 DataType::Struct(_) => {
964 let array = as_struct_array(array)?;
965 hash_struct_array(array, random_state, hashes_buffer)?;
966 }
967 DataType::List(_) => {
968 let array = as_list_array(array)?;
969 hash_list_array(array, random_state, hashes_buffer)?;
970 }
971 DataType::LargeList(_) => {
972 let array = as_large_list_array(array)?;
973 hash_list_array(array, random_state, hashes_buffer)?;
974 }
975 DataType::ListView(_) => {
976 let array = as_list_view_array(array)?;
977 hash_list_view_array(array, random_state, hashes_buffer)?;
978 }
979 DataType::LargeListView(_) => {
980 let array = as_large_list_view_array(array)?;
981 hash_list_view_array(array, random_state, hashes_buffer)?;
982 }
983 DataType::Map(_, _) => {
984 let array = as_map_array(array)?;
985 hash_map_array(array, random_state, hashes_buffer)?;
986 }
987 DataType::FixedSizeList(_,_) => {
988 let array = as_fixed_size_list_array(array)?;
989 hash_fixed_list_array(array, random_state, hashes_buffer)?;
990 }
991 DataType::Union(_, _) => {
992 let array = as_union_array(array)?;
993 hash_union_array(array, random_state, hashes_buffer)?;
994 }
995 DataType::RunEndEncoded(_, _) => downcast_run_array! {
996 array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
997 _ => unreachable!()
998 }
999 _ => {
1000 return _internal_err!(
1002 "Unsupported data type in hasher: {}",
1003 array.data_type()
1004 );
1005 }
1006 }
1007 Ok(())
1008}
1009
1010#[cfg(feature = "force_hash_collisions")]
1012fn hash_single_array(
1013 _array: &dyn Array,
1014 _random_state: &RandomState,
1015 hashes_buffer: &mut [u64],
1016 _rehash: bool,
1017) -> Result<()> {
1018 for hash in hashes_buffer.iter_mut() {
1019 *hash = 0
1020 }
1021 Ok(())
1022}
1023
1024pub trait AsDynArray {
1034 fn as_dyn_array(&self) -> &dyn Array;
1035}
1036
1037impl AsDynArray for dyn Array {
1038 fn as_dyn_array(&self) -> &dyn Array {
1039 self
1040 }
1041}
1042
1043impl AsDynArray for &dyn Array {
1044 fn as_dyn_array(&self) -> &dyn Array {
1045 *self
1046 }
1047}
1048
1049impl AsDynArray for ArrayRef {
1050 fn as_dyn_array(&self) -> &dyn Array {
1051 self.as_ref()
1052 }
1053}
1054
1055impl AsDynArray for &ArrayRef {
1056 fn as_dyn_array(&self) -> &dyn Array {
1057 self.as_ref()
1058 }
1059}
1060
1061pub fn create_hashes<'a, I, T>(
1066 arrays: I,
1067 random_state: &RandomState,
1068 hashes_buffer: &'a mut [u64],
1069) -> Result<&'a mut [u64]>
1070where
1071 I: IntoIterator<Item = T>,
1072 T: AsDynArray,
1073{
1074 for (i, array) in arrays.into_iter().enumerate() {
1075 let rehash = i >= 1;
1077 hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
1078 }
1079 Ok(hashes_buffer)
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084 use std::sync::Arc;
1085
1086 use arrow::array::*;
1087 #[cfg(not(feature = "force_hash_collisions"))]
1088 use arrow::datatypes::*;
1089
1090 use super::*;
1091
1092 #[test]
1093 fn create_hashes_for_decimal_array() -> Result<()> {
1094 let array = vec![1, 2, 3, 4]
1095 .into_iter()
1096 .map(Some)
1097 .collect::<Decimal128Array>()
1098 .with_precision_and_scale(20, 3)
1099 .unwrap();
1100 let array_ref: ArrayRef = Arc::new(array);
1101 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1102 let hashes_buff = &mut vec![0; array_ref.len()];
1103 let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
1104 assert_eq!(hashes.len(), 4);
1105 Ok(())
1106 }
1107
1108 #[test]
1109 fn create_hashes_for_empty_fixed_size_lit() -> Result<()> {
1110 let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
1111 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1112 let hashes_buff = &mut [0; 0];
1113 let hashes = create_hashes(
1114 &[Arc::new(empty_array) as ArrayRef],
1115 &random_state,
1116 hashes_buff,
1117 )?;
1118 assert_eq!(hashes, &Vec::<u64>::new());
1119 Ok(())
1120 }
1121
1122 #[test]
1123 fn create_hashes_for_float_arrays() -> Result<()> {
1124 let f32_arr: ArrayRef =
1125 Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
1126 let f64_arr: ArrayRef =
1127 Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
1128
1129 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1130 let hashes_buff = &mut vec![0; f32_arr.len()];
1131 let hashes = create_hashes(&[f32_arr], &random_state, hashes_buff)?;
1132 assert_eq!(hashes.len(), 4,);
1133
1134 let hashes = create_hashes(&[f64_arr], &random_state, hashes_buff)?;
1135 assert_eq!(hashes.len(), 4,);
1136
1137 Ok(())
1138 }
1139
1140 macro_rules! create_hash_binary {
1141 ($NAME:ident, $ARRAY:ty) => {
1142 #[cfg(not(feature = "force_hash_collisions"))]
1143 #[test]
1144 fn $NAME() {
1145 let binary = [
1146 Some(b"short".to_byte_slice()),
1147 None,
1148 Some(b"long but different 12 bytes string"),
1149 Some(b"short2"),
1150 Some(b"Longer than 12 bytes string"),
1151 Some(b"short"),
1152 Some(b"Longer than 12 bytes string"),
1153 ];
1154
1155 let binary_array: ArrayRef =
1156 Arc::new(binary.iter().cloned().collect::<$ARRAY>());
1157
1158 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1159
1160 let mut binary_hashes = vec![0; binary.len()];
1161 create_hashes(&[binary_array], &random_state, &mut binary_hashes)
1162 .unwrap();
1163
1164 for (val, hash) in binary.iter().zip(binary_hashes.iter()) {
1166 match val {
1167 Some(_) => assert_ne!(*hash, 0),
1168 None => assert_eq!(*hash, 0),
1169 }
1170 }
1171
1172 assert_eq!(binary[0], binary[5]);
1174 assert_eq!(binary[4], binary[6]);
1175
1176 assert_ne!(binary[0], binary[2]);
1178 }
1179 };
1180 }
1181
1182 create_hash_binary!(binary_array, BinaryArray);
1183 create_hash_binary!(large_binary_array, LargeBinaryArray);
1184 create_hash_binary!(binary_view_array, BinaryViewArray);
1185
1186 #[test]
1187 fn create_hashes_fixed_size_binary() -> Result<()> {
1188 let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
1189 let fixed_size_binary_array: ArrayRef =
1190 Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());
1191
1192 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1193 let hashes_buff = &mut vec![0; fixed_size_binary_array.len()];
1194 let hashes =
1195 create_hashes(&[fixed_size_binary_array], &random_state, hashes_buff)?;
1196 assert_eq!(hashes.len(), 3,);
1197
1198 Ok(())
1199 }
1200
1201 macro_rules! create_hash_string {
1202 ($NAME:ident, $ARRAY:ty) => {
1203 #[cfg(not(feature = "force_hash_collisions"))]
1204 #[test]
1205 fn $NAME() {
1206 let strings = [
1207 Some("short"),
1208 None,
1209 Some("long but different 12 bytes string"),
1210 Some("short2"),
1211 Some("Longer than 12 bytes string"),
1212 Some("short"),
1213 Some("Longer than 12 bytes string"),
1214 ];
1215
1216 let string_array: ArrayRef =
1217 Arc::new(strings.iter().cloned().collect::<$ARRAY>());
1218 let dict_array: ArrayRef = Arc::new(
1219 strings
1220 .iter()
1221 .cloned()
1222 .collect::<DictionaryArray<Int8Type>>(),
1223 );
1224
1225 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1226
1227 let mut string_hashes = vec![0; strings.len()];
1228 create_hashes(&[string_array], &random_state, &mut string_hashes)
1229 .unwrap();
1230
1231 let mut dict_hashes = vec![0; strings.len()];
1232 create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
1233
1234 for (val, hash) in strings.iter().zip(string_hashes.iter()) {
1236 match val {
1237 Some(_) => assert_ne!(*hash, 0),
1238 None => assert_eq!(*hash, 0),
1239 }
1240 }
1241
1242 assert_eq!(string_hashes, dict_hashes);
1244
1245 assert_eq!(strings[0], strings[5]);
1247 assert_eq!(strings[4], strings[6]);
1248
1249 assert_ne!(strings[0], strings[2]);
1251 }
1252 };
1253 }
1254
1255 create_hash_string!(string_array, StringArray);
1256 create_hash_string!(large_string_array, LargeStringArray);
1257 create_hash_string!(string_view_array, StringArray);
1258 create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
1259
1260 #[test]
1261 #[cfg(not(feature = "force_hash_collisions"))]
1262 fn create_hashes_for_run_array() -> Result<()> {
1263 let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
1264 let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1265 let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1266
1267 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1268 let hashes_buff = &mut vec![0; array.len()];
1269 let hashes = create_hashes(
1270 &[Arc::clone(&array) as ArrayRef],
1271 &random_state,
1272 hashes_buff,
1273 )?;
1274
1275 assert_eq!(hashes.len(), 7);
1276 assert_eq!(hashes[0], hashes[1]);
1277 assert_eq!(hashes[2], hashes[3]);
1278 assert_eq!(hashes[3], hashes[4]);
1279 assert_eq!(hashes[5], hashes[6]);
1280 assert_ne!(hashes[0], hashes[2]);
1281 assert_ne!(hashes[2], hashes[5]);
1282 assert_ne!(hashes[0], hashes[5]);
1283
1284 Ok(())
1285 }
1286
1287 #[test]
1288 #[cfg(not(feature = "force_hash_collisions"))]
1289 fn create_multi_column_hash_with_run_array() -> Result<()> {
1290 let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
1291 let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
1292 let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1293 let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1294
1295 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1296 let mut one_col_hashes = vec![0; int_array.len()];
1297 create_hashes(
1298 &[Arc::clone(&int_array) as ArrayRef],
1299 &random_state,
1300 &mut one_col_hashes,
1301 )?;
1302
1303 let mut two_col_hashes = vec![0; int_array.len()];
1304 create_hashes(
1305 &[
1306 Arc::clone(&int_array) as ArrayRef,
1307 Arc::clone(&run_array) as ArrayRef,
1308 ],
1309 &random_state,
1310 &mut two_col_hashes,
1311 )?;
1312
1313 assert_eq!(one_col_hashes.len(), 7);
1314 assert_eq!(two_col_hashes.len(), 7);
1315 assert_ne!(one_col_hashes, two_col_hashes);
1316
1317 let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
1318 let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
1319 assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
1320
1321 let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
1322 let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
1323 assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
1324
1325 Ok(())
1326 }
1327
1328 #[test]
1329 #[cfg(not(feature = "force_hash_collisions"))]
1331 fn create_hashes_for_dict_arrays() {
1332 let strings = [Some("foo"), None, Some("bar"), Some("foo"), None];
1333
1334 let string_array: ArrayRef =
1335 Arc::new(strings.iter().cloned().collect::<StringArray>());
1336 let dict_array: ArrayRef = Arc::new(
1337 strings
1338 .iter()
1339 .cloned()
1340 .collect::<DictionaryArray<Int8Type>>(),
1341 );
1342
1343 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1344
1345 let mut string_hashes = vec![0; strings.len()];
1346 create_hashes(&[string_array], &random_state, &mut string_hashes).unwrap();
1347
1348 let mut dict_hashes = vec![0; strings.len()];
1349 create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
1350
1351 for (val, hash) in strings.iter().zip(string_hashes.iter()) {
1353 match val {
1354 Some(_) => assert_ne!(*hash, 0),
1355 None => assert_eq!(*hash, 0),
1356 }
1357 }
1358
1359 assert_eq!(string_hashes, dict_hashes);
1361
1362 assert_eq!(strings[1], strings[4]);
1364 assert_eq!(dict_hashes[1], dict_hashes[4]);
1365 assert_eq!(strings[0], strings[3]);
1366 assert_eq!(dict_hashes[0], dict_hashes[3]);
1367
1368 assert_ne!(strings[0], strings[2]);
1370 assert_ne!(dict_hashes[0], dict_hashes[2]);
1371 }
1372
1373 #[test]
1374 #[cfg(not(feature = "force_hash_collisions"))]
1376 fn create_hashes_for_list_arrays() {
1377 let data = vec![
1378 Some(vec![Some(0), Some(1), Some(2)]),
1379 None,
1380 Some(vec![Some(3), None, Some(5)]),
1381 Some(vec![Some(3), None, Some(5)]),
1382 None,
1383 Some(vec![Some(0), Some(1), Some(2)]),
1384 Some(vec![]),
1385 ];
1386 let list_array =
1387 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
1388 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1389 let mut hashes = vec![0; list_array.len()];
1390 create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
1391 assert_eq!(hashes[0], hashes[5]);
1392 assert_eq!(hashes[1], hashes[4]);
1393 assert_eq!(hashes[2], hashes[3]);
1394 assert_eq!(hashes[1], hashes[6]); }
1396
1397 #[test]
1398 #[cfg(not(feature = "force_hash_collisions"))]
1399 fn create_hashes_for_sliced_list_arrays() {
1400 let data = vec![
1401 Some(vec![Some(0), Some(1), Some(2)]),
1402 None,
1403 Some(vec![Some(3), None, Some(5)]),
1405 Some(vec![Some(3), None, Some(5)]),
1406 None,
1407 Some(vec![Some(0), Some(1), Some(2)]),
1409 Some(vec![]),
1410 ];
1411 let list_array =
1412 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
1413 let list_array = list_array.slice(2, 3);
1414 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1415 let mut hashes = vec![0; list_array.len()];
1416 create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
1417 assert_eq!(hashes[0], hashes[1]);
1418 assert_ne!(hashes[1], hashes[2]);
1419 }
1420
1421 #[test]
1422 #[cfg(not(feature = "force_hash_collisions"))]
1424 fn create_hashes_for_list_view_arrays() {
1425 use arrow::buffer::{NullBuffer, ScalarBuffer};
1426
1427 let values = Arc::new(Int32Array::from(vec![
1429 Some(0),
1430 Some(1),
1431 Some(2),
1432 Some(3),
1433 None,
1434 Some(5),
1435 ])) as ArrayRef;
1436 let field = Arc::new(Field::new("item", DataType::Int32, true));
1437
1438 let offsets = ScalarBuffer::from(vec![0i32, 0, 3, 3, 0, 0, 0]);
1447 let sizes = ScalarBuffer::from(vec![3i32, 0, 3, 3, 0, 3, 0]);
1448 let nulls = Some(NullBuffer::from(vec![
1449 true, false, true, true, false, true, true,
1450 ]));
1451
1452 let list_view_array =
1453 Arc::new(ListViewArray::new(field, offsets, sizes, values, nulls))
1454 as ArrayRef;
1455
1456 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1457 let mut hashes = vec![0; list_view_array.len()];
1458 create_hashes(&[list_view_array], &random_state, &mut hashes).unwrap();
1459
1460 assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); assert_eq!(hashes[2], hashes[3]); assert_eq!(hashes[1], hashes[6]); assert_ne!(hashes[0], hashes[2]); assert_ne!(hashes[0], hashes[6]); assert_ne!(hashes[2], hashes[6]); }
1470
1471 #[test]
1472 #[cfg(not(feature = "force_hash_collisions"))]
1474 fn create_hashes_for_large_list_view_arrays() {
1475 use arrow::buffer::{NullBuffer, ScalarBuffer};
1476
1477 let values = Arc::new(Int32Array::from(vec![
1479 Some(0),
1480 Some(1),
1481 Some(2),
1482 Some(3),
1483 None,
1484 Some(5),
1485 ])) as ArrayRef;
1486 let field = Arc::new(Field::new("item", DataType::Int32, true));
1487
1488 let offsets = ScalarBuffer::from(vec![0i64, 0, 3, 3, 0, 0, 0]);
1497 let sizes = ScalarBuffer::from(vec![3i64, 0, 3, 3, 0, 3, 0]);
1498 let nulls = Some(NullBuffer::from(vec![
1499 true, false, true, true, false, true, true,
1500 ]));
1501
1502 let large_list_view_array = Arc::new(LargeListViewArray::new(
1503 field, offsets, sizes, values, nulls,
1504 )) as ArrayRef;
1505
1506 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1507 let mut hashes = vec![0; large_list_view_array.len()];
1508 create_hashes(&[large_list_view_array], &random_state, &mut hashes).unwrap();
1509
1510 assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); assert_eq!(hashes[2], hashes[3]); assert_eq!(hashes[1], hashes[6]); assert_ne!(hashes[0], hashes[2]); assert_ne!(hashes[0], hashes[6]); assert_ne!(hashes[2], hashes[6]); }
1520
1521 #[test]
1522 #[cfg(not(feature = "force_hash_collisions"))]
1524 fn create_hashes_for_fixed_size_list_arrays() {
1525 let data = vec![
1526 Some(vec![Some(0), Some(1), Some(2)]),
1527 None,
1528 Some(vec![Some(3), None, Some(5)]),
1529 Some(vec![Some(3), None, Some(5)]),
1530 None,
1531 Some(vec![Some(0), Some(1), Some(2)]),
1532 ];
1533 let list_array =
1534 Arc::new(FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
1535 data, 3,
1536 )) as ArrayRef;
1537 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1538 let mut hashes = vec![0; list_array.len()];
1539 create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
1540 assert_eq!(hashes[0], hashes[5]);
1541 assert_eq!(hashes[1], hashes[4]);
1542 assert_eq!(hashes[2], hashes[3]);
1543 }
1544
1545 #[test]
1546 #[cfg(not(feature = "force_hash_collisions"))]
1548 fn create_hashes_for_struct_arrays() {
1549 use arrow::buffer::Buffer;
1550
1551 let boolarr = Arc::new(BooleanArray::from(vec![
1552 false, false, true, true, true, true,
1553 ]));
1554 let i32arr = Arc::new(Int32Array::from(vec![10, 10, 20, 20, 30, 31]));
1555
1556 let struct_array = StructArray::from((
1557 vec![
1558 (
1559 Arc::new(Field::new("bool", DataType::Boolean, false)),
1560 Arc::clone(&boolarr) as ArrayRef,
1561 ),
1562 (
1563 Arc::new(Field::new("i32", DataType::Int32, false)),
1564 Arc::clone(&i32arr) as ArrayRef,
1565 ),
1566 (
1567 Arc::new(Field::new("i32", DataType::Int32, false)),
1568 Arc::clone(&i32arr) as ArrayRef,
1569 ),
1570 (
1571 Arc::new(Field::new("bool", DataType::Boolean, false)),
1572 Arc::clone(&boolarr) as ArrayRef,
1573 ),
1574 ],
1575 Buffer::from(&[0b001011]),
1576 ));
1577
1578 assert!(struct_array.is_valid(0));
1579 assert!(struct_array.is_valid(1));
1580 assert!(struct_array.is_null(2));
1581 assert!(struct_array.is_valid(3));
1582 assert!(struct_array.is_null(4));
1583 assert!(struct_array.is_null(5));
1584
1585 let array = Arc::new(struct_array) as ArrayRef;
1586
1587 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1588 let mut hashes = vec![0; array.len()];
1589 create_hashes(&[array], &random_state, &mut hashes).unwrap();
1590 assert_eq!(hashes[0], hashes[1]);
1591 assert_ne!(hashes[2], hashes[3]);
1593 assert_eq!(hashes[4], hashes[5]);
1595 }
1596
1597 #[test]
1598 #[cfg(not(feature = "force_hash_collisions"))]
1600 fn create_hashes_for_struct_arrays_more_column_than_row() {
1601 let struct_array = StructArray::from(vec![
1602 (
1603 Arc::new(Field::new("bool", DataType::Boolean, false)),
1604 Arc::new(BooleanArray::from(vec![false, false])) as ArrayRef,
1605 ),
1606 (
1607 Arc::new(Field::new("i32-1", DataType::Int32, false)),
1608 Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
1609 ),
1610 (
1611 Arc::new(Field::new("i32-2", DataType::Int32, false)),
1612 Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
1613 ),
1614 (
1615 Arc::new(Field::new("i32-3", DataType::Int32, false)),
1616 Arc::new(Int32Array::from(vec![10, 10])) as ArrayRef,
1617 ),
1618 ]);
1619
1620 assert!(struct_array.is_valid(0));
1621 assert!(struct_array.is_valid(1));
1622
1623 let array = Arc::new(struct_array) as ArrayRef;
1624 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1625 let mut hashes = vec![0; array.len()];
1626 create_hashes(&[array], &random_state, &mut hashes).unwrap();
1627 assert_eq!(hashes[0], hashes[1]);
1628 }
1629
1630 #[test]
1631 #[cfg(not(feature = "force_hash_collisions"))]
1633 fn create_hashes_for_map_arrays() {
1634 let mut builder =
1635 MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
1636 builder.keys().append_value("key1");
1638 builder.keys().append_value("key2");
1639 builder.values().append_value(1);
1640 builder.values().append_value(2);
1641 builder.append(true).unwrap();
1642 builder.keys().append_value("key1");
1644 builder.keys().append_value("key2");
1645 builder.values().append_value(1);
1646 builder.values().append_value(2);
1647 builder.append(true).unwrap();
1648 builder.keys().append_value("key1");
1650 builder.keys().append_value("key2");
1651 builder.values().append_value(1);
1652 builder.values().append_value(3);
1653 builder.append(true).unwrap();
1654 builder.keys().append_value("key1");
1656 builder.keys().append_value("key3");
1657 builder.values().append_value(1);
1658 builder.values().append_value(2);
1659 builder.append(true).unwrap();
1660 builder.keys().append_value("key1");
1662 builder.values().append_value(1);
1663 builder.append(true).unwrap();
1664 builder.keys().append_value("key1");
1666 builder.values().append_null();
1667 builder.append(true).unwrap();
1668 builder.append(true).unwrap();
1670 builder.keys().append_value("key1");
1672 builder.values().append_value(1);
1673 builder.append(false).unwrap();
1674
1675 let array = Arc::new(builder.finish()) as ArrayRef;
1676
1677 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1678 let mut hashes = vec![0; array.len()];
1679 create_hashes(&[array], &random_state, &mut hashes).unwrap();
1680 assert_eq!(hashes[0], hashes[1]); assert_ne!(hashes[0], hashes[2]); assert_ne!(hashes[0], hashes[3]); assert_ne!(hashes[0], hashes[4]); assert_ne!(hashes[4], hashes[5]); assert_eq!(hashes[6], hashes[7]); }
1687
1688 #[test]
1689 #[cfg(not(feature = "force_hash_collisions"))]
1691 fn create_multi_column_hash_for_dict_arrays() {
1692 let strings1 = [Some("foo"), None, Some("bar")];
1693 let strings2 = [Some("blarg"), Some("blah"), None];
1694
1695 let string_array: ArrayRef =
1696 Arc::new(strings1.iter().cloned().collect::<StringArray>());
1697 let dict_array: ArrayRef = Arc::new(
1698 strings2
1699 .iter()
1700 .cloned()
1701 .collect::<DictionaryArray<Int32Type>>(),
1702 );
1703
1704 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1705
1706 let mut one_col_hashes = vec![0; strings1.len()];
1707 create_hashes(
1708 &[Arc::clone(&dict_array) as ArrayRef],
1709 &random_state,
1710 &mut one_col_hashes,
1711 )
1712 .unwrap();
1713
1714 let mut two_col_hashes = vec![0; strings1.len()];
1715 create_hashes(
1716 &[dict_array, string_array],
1717 &random_state,
1718 &mut two_col_hashes,
1719 )
1720 .unwrap();
1721
1722 assert_eq!(one_col_hashes.len(), 3);
1723 assert_eq!(two_col_hashes.len(), 3);
1724
1725 assert_ne!(one_col_hashes, two_col_hashes);
1726 }
1727
1728 #[test]
1729 fn test_create_hashes_from_arrays() {
1730 let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1731 let float_array: ArrayRef =
1732 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
1733
1734 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1735 let hashes_buff = &mut vec![0; int_array.len()];
1736 let hashes =
1737 create_hashes(&[int_array, float_array], &random_state, hashes_buff).unwrap();
1738 assert_eq!(hashes.len(), 4,);
1739 }
1740
1741 #[test]
1742 fn test_create_hashes_from_dyn_arrays() {
1743 let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1744 let float_array: ArrayRef =
1745 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
1746
1747 fn test(arr1: &dyn Array, arr2: &dyn Array) {
1749 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1750 let hashes_buff = &mut vec![0; arr1.len()];
1751 let hashes = create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
1752 assert_eq!(hashes.len(), 4,);
1753 }
1754 test(&*int_array, &*float_array);
1755 }
1756
1757 #[test]
1758 fn test_create_hashes_equivalence() {
1759 let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1760 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1761
1762 let mut hashes1 = vec![0; array.len()];
1763 create_hashes(
1764 &[Arc::clone(&array) as ArrayRef],
1765 &random_state,
1766 &mut hashes1,
1767 )
1768 .unwrap();
1769
1770 let mut hashes2 = vec![0; array.len()];
1771 create_hashes([array], &random_state, &mut hashes2).unwrap();
1772
1773 assert_eq!(hashes1, hashes2);
1774 }
1775
1776 #[test]
1777 fn test_with_hashes() {
1778 let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1779 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1780
1781 let mut expected_hashes = vec![0; array.len()];
1783 create_hashes([&array], &random_state, &mut expected_hashes).unwrap();
1784
1785 let result = with_hashes([&array], &random_state, |hashes| {
1786 assert_eq!(hashes.len(), 4);
1787 assert_eq!(hashes, &expected_hashes[..]);
1789 Ok(hashes.to_vec())
1791 })
1792 .unwrap();
1793
1794 assert_eq!(result, expected_hashes);
1796 }
1797
1798 #[test]
1799 fn test_with_hashes_multi_column() {
1800 let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1801 let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1802 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1803
1804 let mut expected_hashes = vec![0; int_array.len()];
1806 create_hashes(
1807 [&int_array, &str_array],
1808 &random_state,
1809 &mut expected_hashes,
1810 )
1811 .unwrap();
1812
1813 with_hashes([&int_array, &str_array], &random_state, |hashes| {
1814 assert_eq!(hashes.len(), 3);
1815 assert_eq!(hashes, &expected_hashes[..]);
1816 Ok(())
1817 })
1818 .unwrap();
1819 }
1820
1821 #[test]
1822 fn test_with_hashes_empty_arrays() {
1823 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1824
1825 let empty: [&ArrayRef; 0] = [];
1827 let result = with_hashes(empty, &random_state, |_hashes| Ok(()));
1828
1829 assert!(result.is_err());
1830 assert!(
1831 result
1832 .unwrap_err()
1833 .to_string()
1834 .contains("requires at least one array")
1835 );
1836 }
1837
1838 #[test]
1839 fn test_with_hashes_reentrancy() {
1840 let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1841 let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
1842 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1843
1844 let result = with_hashes([&array], &random_state, |_hashes| {
1846 with_hashes([&array2], &random_state, |_inner_hashes| Ok(()))
1848 });
1849
1850 assert!(result.is_err());
1851 let err_msg = result.unwrap_err().to_string();
1852 assert!(
1853 err_msg.contains("reentrantly") || err_msg.contains("cannot be called"),
1854 "Error message should mention reentrancy: {err_msg}",
1855 );
1856 }
1857
1858 #[test]
1859 #[cfg(not(feature = "force_hash_collisions"))]
1860 fn create_hashes_for_sparse_union_arrays() {
1861 let int_array = Int32Array::from(vec![Some(5), None, Some(10), Some(5)]);
1863 let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
1864
1865 let type_ids = vec![0_i8, 1, 0, 0].into();
1866 let children = vec![
1867 Arc::new(int_array) as ArrayRef,
1868 Arc::new(str_array) as ArrayRef,
1869 ];
1870
1871 let union_fields = [
1872 (0, Arc::new(Field::new("a", DataType::Int32, true))),
1873 (1, Arc::new(Field::new("b", DataType::Utf8, true))),
1874 ]
1875 .into_iter()
1876 .collect();
1877
1878 let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
1879 let array_ref = Arc::new(array) as ArrayRef;
1880
1881 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1882 let mut hashes = vec![0; array_ref.len()];
1883 create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1884
1885 assert_eq!(hashes[0], hashes[3]);
1887 assert_ne!(hashes[0], hashes[2]);
1889 assert_ne!(hashes[0], hashes[1]);
1891 }
1892
1893 #[test]
1894 #[cfg(not(feature = "force_hash_collisions"))]
1895 fn create_hashes_for_sparse_union_arrays_with_nulls() {
1896 let int_array = Int32Array::from(vec![Some(5), None, None, None]);
1898 let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
1899
1900 let type_ids = vec![0, 1, 0, 1].into();
1901 let children = vec![
1902 Arc::new(int_array) as ArrayRef,
1903 Arc::new(str_array) as ArrayRef,
1904 ];
1905
1906 let union_fields = [
1907 (0, Arc::new(Field::new("a", DataType::Int32, true))),
1908 (1, Arc::new(Field::new("b", DataType::Utf8, true))),
1909 ]
1910 .into_iter()
1911 .collect();
1912
1913 let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
1914 let array_ref = Arc::new(array) as ArrayRef;
1915
1916 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1917 let mut hashes = vec![0; array_ref.len()];
1918 create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1919
1920 assert_eq!(hashes[2], hashes[3]);
1923
1924 assert_ne!(hashes[0], hashes[2]);
1926
1927 assert_ne!(hashes[1], hashes[3]);
1929 }
1930
1931 #[test]
1932 #[cfg(not(feature = "force_hash_collisions"))]
1933 fn create_hashes_for_dense_union_arrays() {
1934 let int_array = Int32Array::from(vec![67, 100, 67]);
1937 let str_array = StringArray::from(vec!["norm", "macdonald"]);
1938
1939 let type_ids = vec![0, 1, 0, 1, 0].into();
1940 let offsets = vec![0, 0, 1, 1, 2].into();
1941 let children = vec![
1942 Arc::new(int_array) as ArrayRef,
1943 Arc::new(str_array) as ArrayRef,
1944 ];
1945
1946 let union_fields = [
1947 (0, Arc::new(Field::new("a", DataType::Int32, false))),
1948 (1, Arc::new(Field::new("b", DataType::Utf8, false))),
1949 ]
1950 .into_iter()
1951 .collect();
1952
1953 let array =
1954 UnionArray::try_new(union_fields, type_ids, Some(offsets), children).unwrap();
1955 let array_ref = Arc::new(array) as ArrayRef;
1956
1957 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1958 let mut hashes = vec![0; array_ref.len()];
1959 create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1960
1961 assert_ne!(hashes[0], hashes[1]);
1963 assert_ne!(hashes[0], hashes[2]);
1965 assert_ne!(hashes[1], hashes[3]);
1967 assert_ne!(hashes[2], hashes[3]);
1969 assert_eq!(hashes[0], hashes[4]);
1971 }
1972
1973 #[test]
1974 #[cfg(not(feature = "force_hash_collisions"))]
1975 fn create_hashes_for_sliced_run_array() -> Result<()> {
1976 let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
1977 let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1978 let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1979
1980 let random_state = RandomState::with_seeds(0, 0, 0, 0);
1981 let mut full_hashes = vec![0; array.len()];
1982 create_hashes(
1983 &[Arc::clone(&array) as ArrayRef],
1984 &random_state,
1985 &mut full_hashes,
1986 )?;
1987
1988 let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
1989 let sliced_array = array_ref.slice(2, 3);
1990
1991 let mut sliced_hashes = vec![0; sliced_array.len()];
1992 create_hashes(
1993 std::slice::from_ref(&sliced_array),
1994 &random_state,
1995 &mut sliced_hashes,
1996 )?;
1997
1998 assert_eq!(sliced_hashes.len(), 3);
1999 assert_eq!(sliced_hashes[0], sliced_hashes[1]);
2000 assert_eq!(sliced_hashes[1], sliced_hashes[2]);
2001 assert_eq!(&sliced_hashes, &full_hashes[2..5]);
2002
2003 Ok(())
2004 }
2005
2006 #[test]
2007 #[cfg(not(feature = "force_hash_collisions"))]
2008 fn test_run_array_with_nulls() -> Result<()> {
2009 let values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
2010 let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
2011 let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
2012
2013 let random_state = RandomState::with_seeds(0, 0, 0, 0);
2014 let mut hashes = vec![0; array.len()];
2015 create_hashes(
2016 &[Arc::clone(&array) as ArrayRef],
2017 &random_state,
2018 &mut hashes,
2019 )?;
2020
2021 assert_eq!(hashes[0], hashes[1]);
2022 assert_ne!(hashes[0], 0);
2023 assert_eq!(hashes[2], hashes[3]);
2024 assert_eq!(hashes[2], 0);
2025 assert_eq!(hashes[4], hashes[5]);
2026 assert_ne!(hashes[4], 0);
2027 assert_ne!(hashes[0], hashes[4]);
2028
2029 Ok(())
2030 }
2031
2032 #[test]
2033 #[cfg(not(feature = "force_hash_collisions"))]
2034 fn test_run_array_with_nulls_multicolumn() -> Result<()> {
2035 let primitive_array = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
2036 let run_values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
2037 let run_ends = Arc::new(Int32Array::from(vec![1, 2, 3]));
2038 let run_array =
2039 Arc::new(RunArray::try_new(&run_ends, run_values.as_ref()).unwrap());
2040 let second_col = Arc::new(Int32Array::from(vec![100, 200, 300]));
2041
2042 let random_state = RandomState::with_seeds(0, 0, 0, 0);
2043
2044 let mut primitive_hashes = vec![0; 3];
2045 create_hashes(
2046 &[
2047 Arc::clone(&primitive_array) as ArrayRef,
2048 Arc::clone(&second_col) as ArrayRef,
2049 ],
2050 &random_state,
2051 &mut primitive_hashes,
2052 )?;
2053
2054 let mut run_hashes = vec![0; 3];
2055 create_hashes(
2056 &[
2057 Arc::clone(&run_array) as ArrayRef,
2058 Arc::clone(&second_col) as ArrayRef,
2059 ],
2060 &random_state,
2061 &mut run_hashes,
2062 )?;
2063
2064 assert_eq!(primitive_hashes, run_hashes);
2065
2066 Ok(())
2067 }
2068}