1#![allow(unsafe_op_in_unsafe_fn)]
2mod any_all;
3#[cfg(feature = "approx_unique")]
4mod approx_n_unique;
5#[cfg(feature = "bitwise")]
6mod bitwise;
7mod convert;
8mod count;
9mod first_last;
10mod len;
11mod mean;
12mod min_max;
13mod sum;
14mod var_std;
15
16use std::any::Any;
17use std::borrow::Cow;
18use std::marker::PhantomData;
19
20use arrow::array::{Array, PrimitiveArray, StaticArray};
21use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
22pub use convert::into_reduction;
23pub use min_max::{new_max_reduction, new_min_reduction};
24use polars_core::prelude::*;
25
26use crate::EvictIdx;
27
28pub trait GroupedReduction: Any + Send + Sync {
32 fn new_empty(&self) -> Box<dyn GroupedReduction>;
34
35 fn reserve(&mut self, additional: usize);
37
38 fn resize(&mut self, num_groups: IdxSize);
43
44 fn update_group(
49 &mut self,
50 values: &Column,
51 group_idx: IdxSize,
52 seq_id: u64,
53 ) -> PolarsResult<()>;
54
55 unsafe fn update_groups_subset(
63 &mut self,
64 values: &Column,
65 subset: &[IdxSize],
66 group_idxs: &[IdxSize],
67 seq_id: u64,
68 ) -> PolarsResult<()> {
69 assert!(values.len() < (1 << (IdxSize::BITS - 1)));
70 let evict_group_idxs = core::mem::transmute::<&[IdxSize], &[EvictIdx]>(group_idxs);
71 self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
72 }
73
74 unsafe fn update_groups_while_evicting(
83 &mut self,
84 values: &Column,
85 subset: &[IdxSize],
86 group_idxs: &[EvictIdx],
87 seq_id: u64,
88 ) -> PolarsResult<()>;
89
90 unsafe fn combine_subset(
97 &mut self,
98 other: &dyn GroupedReduction,
99 subset: &[IdxSize],
100 group_idxs: &[IdxSize],
101 ) -> PolarsResult<()>;
102
103 fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
105
106 fn finalize(&mut self) -> PolarsResult<Series>;
110
111 fn as_any(&self) -> &dyn Any;
113}
114
115pub trait Reducer: Send + Sync + Clone + 'static {
118 type Dtype: PolarsPhysicalType;
119 type Value: Clone + Send + Sync + 'static;
120 fn init(&self) -> Self::Value;
121 #[inline(always)]
122 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
123 Cow::Borrowed(s)
124 }
125 fn combine(&self, a: &mut Self::Value, b: &Self::Value);
126 fn reduce_one(
127 &self,
128 a: &mut Self::Value,
129 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
130 seq_id: u64,
131 );
132 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
133 fn finish(
134 &self,
135 v: Vec<Self::Value>,
136 m: Option<Bitmap>,
137 dtype: &DataType,
138 ) -> PolarsResult<Series>;
139}
140
141pub trait NumericReduction: Send + Sync + 'static {
142 type Dtype: PolarsNumericType;
143 fn init() -> <Self::Dtype as PolarsNumericType>::Native;
144 fn combine(
145 a: <Self::Dtype as PolarsNumericType>::Native,
146 b: <Self::Dtype as PolarsNumericType>::Native,
147 ) -> <Self::Dtype as PolarsNumericType>::Native;
148 fn reduce_ca(
149 ca: &ChunkedArray<Self::Dtype>,
150 ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
151}
152
153struct NumReducer<R: NumericReduction>(PhantomData<R>);
154impl<R: NumericReduction> NumReducer<R> {
155 fn new() -> Self {
156 Self(PhantomData)
157 }
158}
159impl<R: NumericReduction> Clone for NumReducer<R> {
160 fn clone(&self) -> Self {
161 Self(PhantomData)
162 }
163}
164
165impl<R: NumericReduction> Reducer for NumReducer<R> {
166 type Dtype = <R as NumericReduction>::Dtype;
167 type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
168
169 #[inline(always)]
170 fn init(&self) -> Self::Value {
171 <R as NumericReduction>::init()
172 }
173
174 #[inline(always)]
175 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
176 s.to_physical_repr()
177 }
178
179 #[inline(always)]
180 fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
181 *a = <R as NumericReduction>::combine(*a, *b);
182 }
183
184 #[inline(always)]
185 fn reduce_one(
186 &self,
187 a: &mut Self::Value,
188 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
189 _seq_id: u64,
190 ) {
191 if let Some(b) = b {
192 *a = <R as NumericReduction>::combine(*a, b);
193 }
194 }
195
196 #[inline(always)]
197 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
198 if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
199 *v = <R as NumericReduction>::combine(*v, r);
200 }
201 }
202
203 fn finish(
204 &self,
205 v: Vec<Self::Value>,
206 m: Option<Bitmap>,
207 dtype: &DataType,
208 ) -> PolarsResult<Series> {
209 let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
210 Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
211 }
212}
213
214pub struct VecGroupedReduction<R: Reducer> {
215 values: Vec<R::Value>,
216 evicted_values: Vec<R::Value>,
217 in_dtype: DataType,
218 reducer: R,
219}
220
221impl<R: Reducer> VecGroupedReduction<R> {
222 pub fn new(in_dtype: DataType, reducer: R) -> Self {
223 Self {
224 values: Vec::new(),
225 evicted_values: Vec::new(),
226 in_dtype,
227 reducer,
228 }
229 }
230}
231
232impl<R> GroupedReduction for VecGroupedReduction<R>
233where
234 R: Reducer,
235{
236 fn new_empty(&self) -> Box<dyn GroupedReduction> {
237 Box::new(Self {
238 values: Vec::new(),
239 evicted_values: Vec::new(),
240 in_dtype: self.in_dtype.clone(),
241 reducer: self.reducer.clone(),
242 })
243 }
244
245 fn reserve(&mut self, additional: usize) {
246 self.values.reserve(additional);
247 }
248
249 fn resize(&mut self, num_groups: IdxSize) {
250 self.values.resize(num_groups as usize, self.reducer.init());
251 }
252
253 fn update_group(
254 &mut self,
255 values: &Column,
256 group_idx: IdxSize,
257 seq_id: u64,
258 ) -> PolarsResult<()> {
259 assert!(values.dtype() == &self.in_dtype);
260 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
263 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
264 self.reducer
265 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
266 Ok(())
267 }
268
269 unsafe fn update_groups_while_evicting(
270 &mut self,
271 values: &Column,
272 subset: &[IdxSize],
273 group_idxs: &[EvictIdx],
274 seq_id: u64,
275 ) -> PolarsResult<()> {
276 assert!(values.dtype() == &self.in_dtype);
277 assert!(subset.len() == group_idxs.len());
278 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
281 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
282 let arr = ca.downcast_as_array();
283 unsafe {
284 if values.has_nulls() {
286 for (i, g) in subset.iter().zip(group_idxs) {
287 let ov = arr.get_unchecked(*i as usize);
288 let grp = self.values.get_unchecked_mut(g.idx());
289 if g.should_evict() {
290 let old = core::mem::replace(grp, self.reducer.init());
291 self.evicted_values.push(old);
292 }
293 self.reducer.reduce_one(grp, ov, seq_id);
294 }
295 } else {
296 for (i, g) in subset.iter().zip(group_idxs) {
297 let v = arr.value_unchecked(*i as usize);
298 let grp = self.values.get_unchecked_mut(g.idx());
299 if g.should_evict() {
300 let old = core::mem::replace(grp, self.reducer.init());
301 self.evicted_values.push(old);
302 }
303 self.reducer.reduce_one(grp, Some(v), seq_id);
304 }
305 }
306 }
307 Ok(())
308 }
309
310 unsafe fn combine_subset(
311 &mut self,
312 other: &dyn GroupedReduction,
313 subset: &[IdxSize],
314 group_idxs: &[IdxSize],
315 ) -> PolarsResult<()> {
316 let other = other.as_any().downcast_ref::<Self>().unwrap();
317 assert!(self.in_dtype == other.in_dtype);
318 assert!(subset.len() == group_idxs.len());
319 unsafe {
320 for (i, g) in subset.iter().zip(group_idxs) {
322 let v = other.values.get_unchecked(*i as usize);
323 let grp = self.values.get_unchecked_mut(*g as usize);
324 self.reducer.combine(grp, v);
325 }
326 }
327 Ok(())
328 }
329
330 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
331 Box::new(Self {
332 values: core::mem::take(&mut self.evicted_values),
333 evicted_values: Vec::new(),
334 in_dtype: self.in_dtype.clone(),
335 reducer: self.reducer.clone(),
336 })
337 }
338
339 fn finalize(&mut self) -> PolarsResult<Series> {
340 let v = core::mem::take(&mut self.values);
341 self.reducer.finish(v, None, &self.in_dtype)
342 }
343
344 fn as_any(&self) -> &dyn Any {
345 self
346 }
347}
348
349pub struct VecMaskGroupedReduction<R: Reducer> {
350 values: Vec<R::Value>,
351 mask: MutableBitmap,
352 evicted_values: Vec<R::Value>,
353 evicted_mask: BitmapBuilder,
354 in_dtype: DataType,
355 reducer: R,
356}
357
358impl<R: Reducer> VecMaskGroupedReduction<R> {
359 fn new(in_dtype: DataType, reducer: R) -> Self {
360 Self {
361 values: Vec::new(),
362 mask: MutableBitmap::new(),
363 evicted_values: Vec::new(),
364 evicted_mask: BitmapBuilder::new(),
365 in_dtype,
366 reducer,
367 }
368 }
369}
370
371impl<R> GroupedReduction for VecMaskGroupedReduction<R>
372where
373 R: Reducer,
374{
375 fn new_empty(&self) -> Box<dyn GroupedReduction> {
376 Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
377 }
378
379 fn reserve(&mut self, additional: usize) {
380 self.values.reserve(additional);
381 self.mask.reserve(additional)
382 }
383
384 fn resize(&mut self, num_groups: IdxSize) {
385 self.values.resize(num_groups as usize, self.reducer.init());
386 self.mask.resize(num_groups as usize, false);
387 }
388
389 fn update_group(
390 &mut self,
391 values: &Column,
392 group_idx: IdxSize,
393 seq_id: u64,
394 ) -> PolarsResult<()> {
395 assert!(values.dtype() == &self.in_dtype);
396 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
399 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
400 self.reducer
401 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
402 if ca.len() != ca.null_count() {
403 self.mask.set(group_idx as usize, true);
404 }
405 Ok(())
406 }
407
408 unsafe fn update_groups_while_evicting(
409 &mut self,
410 values: &Column,
411 subset: &[IdxSize],
412 group_idxs: &[EvictIdx],
413 seq_id: u64,
414 ) -> PolarsResult<()> {
415 assert!(values.dtype() == &self.in_dtype);
416 assert!(subset.len() == group_idxs.len());
417 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
420 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
421 let arr = ca.downcast_as_array();
422 unsafe {
423 for (i, g) in subset.iter().zip(group_idxs) {
425 let ov = arr.get_unchecked(*i as usize);
426 let grp = self.values.get_unchecked_mut(g.idx());
427 if g.should_evict() {
428 self.evicted_values
429 .push(core::mem::replace(grp, self.reducer.init()));
430 self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
431 self.mask.set_unchecked(g.idx(), false);
432 }
433 if let Some(v) = ov {
434 self.reducer.reduce_one(grp, Some(v), seq_id);
435 self.mask.set_unchecked(g.idx(), true);
436 }
437 }
438 }
439 Ok(())
440 }
441
442 unsafe fn combine_subset(
443 &mut self,
444 other: &dyn GroupedReduction,
445 subset: &[IdxSize],
446 group_idxs: &[IdxSize],
447 ) -> PolarsResult<()> {
448 let other = other.as_any().downcast_ref::<Self>().unwrap();
449 assert!(self.in_dtype == other.in_dtype);
450 assert!(subset.len() == group_idxs.len());
451 unsafe {
452 for (i, g) in subset.iter().zip(group_idxs) {
454 let o = other.mask.get_unchecked(*i as usize);
455 if o {
456 let v = other.values.get_unchecked(*i as usize);
457 let grp = self.values.get_unchecked_mut(*g as usize);
458 self.reducer.combine(grp, v);
459 self.mask.set_unchecked(*g as usize, true);
460 }
461 }
462 }
463 Ok(())
464 }
465
466 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
467 Box::new(Self {
468 values: core::mem::take(&mut self.evicted_values),
469 mask: core::mem::take(&mut self.evicted_mask).into_mut(),
470 evicted_values: Vec::new(),
471 evicted_mask: BitmapBuilder::new(),
472 in_dtype: self.in_dtype.clone(),
473 reducer: self.reducer.clone(),
474 })
475 }
476
477 fn finalize(&mut self) -> PolarsResult<Series> {
478 let v = core::mem::take(&mut self.values);
479 let m = core::mem::take(&mut self.mask);
480 self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
481 }
482
483 fn as_any(&self) -> &dyn Any {
484 self
485 }
486}
487
488#[derive(Clone)]
489pub struct NullGroupedReduction {
490 num_groups: IdxSize,
491 num_evictions: IdxSize,
492 output: Scalar,
493}
494
495impl NullGroupedReduction {
496 pub fn new(output: Scalar) -> Self {
497 Self {
498 num_groups: 0,
499 num_evictions: 0,
500 output,
501 }
502 }
503}
504
505impl GroupedReduction for NullGroupedReduction {
506 fn new_empty(&self) -> Box<dyn GroupedReduction> {
507 Box::new(Self::new(self.output.clone()))
508 }
509
510 fn reserve(&mut self, _additional: usize) {}
511
512 fn resize(&mut self, num_groups: IdxSize) {
513 self.num_groups = num_groups;
514 }
515
516 fn update_group(
517 &mut self,
518 values: &Column,
519 _group_idx: IdxSize,
520 _seq_id: u64,
521 ) -> PolarsResult<()> {
522 assert!(values.dtype().is_null());
523 Ok(())
524 }
525
526 unsafe fn update_groups_while_evicting(
527 &mut self,
528 values: &Column,
529 subset: &[IdxSize],
530 group_idxs: &[EvictIdx],
531 _seq_id: u64,
532 ) -> PolarsResult<()> {
533 assert!(values.dtype().is_null());
534 assert!(subset.len() == group_idxs.len());
535 for g in group_idxs {
536 self.num_evictions += g.should_evict() as IdxSize;
537 }
538 Ok(())
539 }
540
541 unsafe fn combine_subset(
542 &mut self,
543 other: &dyn GroupedReduction,
544 subset: &[IdxSize],
545 group_idxs: &[IdxSize],
546 ) -> PolarsResult<()> {
547 let _other = other.as_any().downcast_ref::<Self>().unwrap();
548 assert!(subset.len() == group_idxs.len());
549 Ok(())
550 }
551
552 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
553 Box::new(Self {
554 num_groups: core::mem::replace(&mut self.num_evictions, 0),
555 num_evictions: 0,
556 output: self.output.clone(),
557 })
558 }
559
560 fn finalize(&mut self) -> PolarsResult<Series> {
561 let length = core::mem::replace(&mut self.num_groups, 0) as usize;
562 let s = self.output.clone().into_series(PlSmallStr::EMPTY);
563 Ok(s.new_from_index(0, length))
564 }
565
566 fn as_any(&self) -> &dyn Any {
567 self
568 }
569}