1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
use std::fmt::Debug;
use num_traits::{FromPrimitive, ToPrimitive};
use polars_utils::idx_vec::IdxVec;
use polars_utils::slice::GetSaferUnchecked;
use polars_utils::sync::SyncPtr;
use rayon::prelude::*;
#[cfg(all(feature = "dtype-categorical", feature = "performant"))]
use crate::config::verbose;
use crate::datatypes::*;
use crate::prelude::*;
use crate::POOL;
impl<T> ChunkedArray<T>
where
T: PolarsIntegerType,
T::Native: ToPrimitive + FromPrimitive + Debug,
{
// Use the indexes as perfect groups
pub fn group_tuples_perfect(
&self,
max: usize,
mut multithreaded: bool,
group_capacity: usize,
) -> GroupsProxy {
multithreaded &= POOL.current_num_threads() > 1;
let len = if self.null_count() > 0 {
// we add one to store the null sentinel group
max + 2
} else {
max + 1
};
// the latest index will be used for the null sentinel
let null_idx = len.saturating_sub(1);
let n_threads = POOL.current_num_threads();
let chunk_size = len / n_threads;
let (groups, first) = if multithreaded && chunk_size > 1 {
let mut groups: Vec<IdxVec> = unsafe { aligned_vec(len) };
groups.resize_with(len, || IdxVec::with_capacity(group_capacity));
let mut first: Vec<IdxSize> = unsafe { aligned_vec(len) };
// ensure we keep aligned to cache lines
let chunk_size = (chunk_size * std::mem::size_of::<T::Native>()).next_multiple_of(64);
let chunk_size = chunk_size / std::mem::size_of::<T::Native>();
let mut cache_line_offsets = Vec::with_capacity(n_threads + 1);
cache_line_offsets.push(0);
let mut current_offset = chunk_size;
while current_offset <= len {
cache_line_offsets.push(current_offset);
current_offset += chunk_size;
}
cache_line_offsets.push(current_offset);
let groups_ptr = unsafe { SyncPtr::new(groups.as_mut_ptr()) };
let first_ptr = unsafe { SyncPtr::new(first.as_mut_ptr()) };
// The number of threads is dependent on the number of categoricals/ unique values
// as every at least writes to a single cache line
// lower bound per thread:
// 32bit: 16
// 64bit: 8
POOL.install(|| {
(0..cache_line_offsets.len() - 1)
.into_par_iter()
.for_each(|thread_no| {
let mut row_nr = 0 as IdxSize;
let start = cache_line_offsets[thread_no];
let start = T::Native::from_usize(start).unwrap();
let end = cache_line_offsets[thread_no + 1];
let end = T::Native::from_usize(end).unwrap();
// SAFETY: we don't alias
let groups =
unsafe { std::slice::from_raw_parts_mut(groups_ptr.get(), len) };
let first = unsafe { std::slice::from_raw_parts_mut(first_ptr.get(), len) };
for arr in self.downcast_iter() {
if arr.null_count() == 0 {
for &cat in arr.values().as_slice() {
if cat >= start && cat < end {
let cat = cat.to_usize().unwrap();
let buf = unsafe { groups.get_unchecked_release_mut(cat) };
buf.push(row_nr);
unsafe {
if buf.len() == 1 {
// SAFETY: we just pushed
let first_value = buf.get_unchecked(0);
*first.get_unchecked_release_mut(cat) = *first_value
}
}
}
row_nr += 1;
}
} else {
for opt_cat in arr.iter() {
if let Some(&cat) = opt_cat {
// cannot factor out due to bchk
if cat >= start && cat < end {
let cat = cat.to_usize().unwrap();
let buf =
unsafe { groups.get_unchecked_release_mut(cat) };
buf.push(row_nr);
unsafe {
if buf.len() == 1 {
// SAFETY: we just pushed
let first_value = buf.get_unchecked(0);
*first.get_unchecked_release_mut(cat) =
*first_value
}
}
}
}
// last thread handles null values
else if thread_no == cache_line_offsets.len() - 2 {
let buf =
unsafe { groups.get_unchecked_release_mut(null_idx) };
buf.push(row_nr);
unsafe {
if buf.len() == 1 {
let first_value = buf.get_unchecked(0);
*first.get_unchecked_release_mut(null_idx) =
*first_value
}
}
}
row_nr += 1;
}
}
}
});
});
unsafe {
groups.set_len(len);
first.set_len(len);
}
(groups, first)
} else {
let mut groups = Vec::with_capacity(len);
let mut first = vec![IdxSize::MAX; len];
groups.resize_with(len, || IdxVec::with_capacity(group_capacity));
let mut row_nr = 0 as IdxSize;
for arr in self.downcast_iter() {
for opt_cat in arr.iter() {
if let Some(cat) = opt_cat {
let group_id = cat.to_usize().unwrap();
let buf = unsafe { groups.get_unchecked_release_mut(group_id) };
buf.push(row_nr);
unsafe {
if buf.len() == 1 {
*first.get_unchecked_release_mut(group_id) = row_nr;
}
}
} else {
let buf = unsafe { groups.get_unchecked_release_mut(null_idx) };
buf.push(row_nr);
unsafe {
let first_value = buf.get_unchecked(0);
*first.get_unchecked_release_mut(null_idx) = *first_value
}
}
row_nr += 1;
}
}
(groups, first)
};
// NOTE! we set sorted here!
// this happens to be true for `fast_unique` categoricals
GroupsProxy::Idx(GroupsIdx::new(first, groups, true))
}
}
#[cfg(all(feature = "dtype-categorical", feature = "performant"))]
// Special implementation so that cats can be processed in a single pass
impl CategoricalChunked {
// Use the indexes as perfect groups
pub fn group_tuples_perfect(&self, multithreaded: bool, sorted: bool) -> GroupsProxy {
let rev_map = self.get_rev_map();
if self.is_empty() {
return GroupsProxy::Idx(GroupsIdx::new(vec![], vec![], true));
}
let cats = self.physical();
let mut out = match &**rev_map {
RevMapping::Local(cached, _) => {
if self._can_fast_unique() {
if verbose() {
eprintln!("grouping categoricals, run perfect hash function");
}
// on relative small tables this isn't much faster than the default strategy
// but on huge tables, this can be > 2x faster
cats.group_tuples_perfect(cached.len() - 1, multithreaded, 0)
} else {
self.physical().group_tuples(multithreaded, sorted).unwrap()
}
},
RevMapping::Global(_mapping, _cached, _) => {
// TODO! see if we can optimize this
// the problem is that the global categories are not guaranteed packed together
// so we might need to deref them first to local ones, but that might be more
// expensive than just hashing (benchmark first)
self.physical().group_tuples(multithreaded, sorted).unwrap()
},
};
if sorted {
out.sort()
}
out
}
}
#[repr(C, align(64))]
struct AlignTo64([u8; 64]);
/// There are no guarantees that the [`Vec<T>`] will remain aligned if you reallocate the data.
/// This means that you cannot reallocate so you will need to know how big to allocate up front.
unsafe fn aligned_vec<T>(n: usize) -> Vec<T> {
assert!(std::mem::align_of::<T>() <= 64);
let n_units = (n * std::mem::size_of::<T>() / std::mem::size_of::<AlignTo64>()) + 1;
let mut aligned: Vec<AlignTo64> = Vec::with_capacity(n_units);
let ptr = aligned.as_mut_ptr();
let cap_units = aligned.capacity();
std::mem::forget(aligned);
Vec::from_raw_parts(
ptr as *mut T,
0,
cap_units * std::mem::size_of::<AlignTo64>() / std::mem::size_of::<T>(),
)
}