polars_core/chunked_array/
binary.rs1use polars_utils::aliases::PlRandomState;
2use polars_utils::hashing::BytesHash;
3use rayon::prelude::*;
4
5use crate::hashing::get_null_hash_value;
6use crate::prelude::*;
7use crate::utils::{_set_partition_size, _split_offsets};
8use crate::POOL;
9
10#[inline]
11fn fill_bytes_hashes<'a, T>(
12 ca: &'a ChunkedArray<T>,
13 null_h: u64,
14 hb: PlRandomState,
15) -> Vec<BytesHash<'a>>
16where
17 T: PolarsDataType,
18 <<T as PolarsDataType>::Array as StaticArray>::ValueT<'a>: AsRef<[u8]>,
19{
20 let mut byte_hashes = Vec::with_capacity(ca.len());
21 for arr in ca.downcast_iter() {
22 for opt_b in arr.iter() {
23 let opt_b = opt_b.as_ref().map(|v| v.as_ref());
24 let opt_b = unsafe { std::mem::transmute::<Option<&[u8]>, Option<&'a [u8]>>(opt_b) };
27 let hash = match opt_b {
28 Some(s) => hb.hash_one(s),
29 None => null_h,
30 };
31 byte_hashes.push(BytesHash::new(opt_b, hash))
32 }
33 }
34 byte_hashes
35}
36
37impl<T> ChunkedArray<T>
38where
39 T: PolarsDataType,
40 for<'a> <T::Array as StaticArray>::ValueT<'a>: AsRef<[u8]>,
41{
42 #[allow(clippy::needless_lifetimes)]
43 pub fn to_bytes_hashes<'a>(
44 &'a self,
45 mut multithreaded: bool,
46 hb: PlRandomState,
47 ) -> Vec<Vec<BytesHash<'a>>> {
48 multithreaded &= POOL.current_num_threads() > 1;
49 let null_h = get_null_hash_value(&hb);
50
51 if multithreaded {
52 let n_partitions = _set_partition_size();
53
54 let split = _split_offsets(self.len(), n_partitions);
55
56 POOL.install(|| {
57 split
58 .into_par_iter()
59 .map(|(offset, len)| {
60 let ca = self.slice(offset as i64, len);
61 let byte_hashes = fill_bytes_hashes(&ca, null_h, hb.clone());
62
63 unsafe {
66 std::mem::transmute::<Vec<BytesHash<'_>>, Vec<BytesHash<'a>>>(
67 byte_hashes,
68 )
69 }
70 })
71 .collect::<Vec<_>>()
72 })
73 } else {
74 vec![fill_bytes_hashes(self, null_h, hb.clone())]
75 }
76 }
77}