1#![allow(unsafe_op_in_unsafe_fn)]
2mod any_all;
3#[cfg(feature = "approx_unique")]
4mod approx_n_unique;
5#[cfg(feature = "bitwise")]
6mod bitwise;
7mod convert;
8mod count;
9#[cfg(feature = "cov")]
10mod cov;
11mod first_last;
12mod first_last_nonnull;
13mod has_nulls;
14mod implode;
15mod is_empty;
16mod mean;
17mod min_max;
18mod min_max_by;
19#[cfg(feature = "moment")]
20mod skew_kurtosis;
21mod sum;
22mod var_std;
23
24use std::any::Any;
25use std::borrow::Cow;
26use std::marker::PhantomData;
27
28use arrow::array::{Array, PrimitiveArray, StaticArray};
29use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
30pub use convert::into_reduction;
31pub use min_max::{new_max_reduction, new_min_reduction};
32use polars_core::prelude::*;
33
34use crate::EvictIdx;
35
36pub trait GroupedReduction: Any + Send + Sync {
40 fn new_empty(&self) -> Box<dyn GroupedReduction>;
42
43 fn reserve(&mut self, additional: usize);
45
46 fn resize(&mut self, num_groups: IdxSize);
51
52 fn update_group(
57 &mut self,
58 values: &[&Column],
59 group_idx: IdxSize,
60 seq_id: u64,
61 ) -> PolarsResult<()>;
62
63 unsafe fn update_groups_subset(
73 &mut self,
74 values: &[&Column],
75 subset: &[IdxSize],
76 group_idxs: &[IdxSize],
77 seq_id: u64,
78 ) -> PolarsResult<()> {
79 assert!(values.len() < (1 << (IdxSize::BITS - 1)));
80 let evict_group_idxs = EvictIdx::cast_slice(group_idxs);
82 self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
83 }
84
85 unsafe fn update_groups_while_evicting(
96 &mut self,
97 values: &[&Column],
98 subset: &[IdxSize],
99 group_idxs: &[EvictIdx],
100 seq_id: u64,
101 ) -> PolarsResult<()>;
102
103 unsafe fn combine_subset(
110 &mut self,
111 other: &dyn GroupedReduction,
112 subset: &[IdxSize],
113 group_idxs: &[IdxSize],
114 ) -> PolarsResult<()>;
115
116 fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
118
119 fn finalize(&mut self) -> PolarsResult<Series>;
123
124 fn as_any(&self) -> &dyn Any;
126}
127
128pub trait Reducer: Send + Sync + Clone + 'static {
131 type Dtype: PolarsPhysicalType;
132 type Value: Clone + Send + Sync + 'static;
133 fn init(&self) -> Self::Value;
134 #[inline(always)]
135 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
136 Cow::Borrowed(s)
137 }
138 fn combine(&self, a: &mut Self::Value, b: &Self::Value);
139 fn reduce_one(
140 &self,
141 a: &mut Self::Value,
142 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
143 seq_id: u64,
144 );
145 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
146 fn finish(
147 &self,
148 v: Vec<Self::Value>,
149 m: Option<Bitmap>,
150 dtype: &DataType,
151 ) -> PolarsResult<Series>;
152}
153
154pub trait NumericReduction: Send + Sync + 'static {
155 type Dtype: PolarsNumericType;
156 fn init() -> <Self::Dtype as PolarsNumericType>::Native;
157 fn combine(
158 a: <Self::Dtype as PolarsNumericType>::Native,
159 b: <Self::Dtype as PolarsNumericType>::Native,
160 ) -> <Self::Dtype as PolarsNumericType>::Native;
161 fn reduce_ca(
162 ca: &ChunkedArray<Self::Dtype>,
163 ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
164}
165
166struct NumReducer<R: NumericReduction>(PhantomData<R>);
167impl<R: NumericReduction> NumReducer<R> {
168 fn new() -> Self {
169 Self(PhantomData)
170 }
171}
172impl<R: NumericReduction> Clone for NumReducer<R> {
173 fn clone(&self) -> Self {
174 Self(PhantomData)
175 }
176}
177
178impl<R: NumericReduction> Reducer for NumReducer<R> {
179 type Dtype = <R as NumericReduction>::Dtype;
180 type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
181
182 #[inline(always)]
183 fn init(&self) -> Self::Value {
184 <R as NumericReduction>::init()
185 }
186
187 #[inline(always)]
188 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
189 s.to_physical_repr()
190 }
191
192 #[inline(always)]
193 fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
194 *a = <R as NumericReduction>::combine(*a, *b);
195 }
196
197 #[inline(always)]
198 fn reduce_one(
199 &self,
200 a: &mut Self::Value,
201 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
202 _seq_id: u64,
203 ) {
204 if let Some(b) = b {
205 *a = <R as NumericReduction>::combine(*a, b);
206 }
207 }
208
209 #[inline(always)]
210 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
211 if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
212 *v = <R as NumericReduction>::combine(*v, r);
213 }
214 }
215
216 fn finish(
217 &self,
218 v: Vec<Self::Value>,
219 m: Option<Bitmap>,
220 dtype: &DataType,
221 ) -> PolarsResult<Series> {
222 let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
223 Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
224 }
225}
226
227pub struct VecGroupedReduction<R: Reducer> {
228 values: Vec<R::Value>,
229 evicted_values: Vec<R::Value>,
230 in_dtype: DataType,
231 reducer: R,
232}
233
234impl<R: Reducer> VecGroupedReduction<R> {
235 pub fn new(in_dtype: DataType, reducer: R) -> Self {
236 Self {
237 values: Vec::new(),
238 evicted_values: Vec::new(),
239 in_dtype,
240 reducer,
241 }
242 }
243}
244
245impl<R> GroupedReduction for VecGroupedReduction<R>
246where
247 R: Reducer,
248{
249 fn new_empty(&self) -> Box<dyn GroupedReduction> {
250 Box::new(Self {
251 values: Vec::new(),
252 evicted_values: Vec::new(),
253 in_dtype: self.in_dtype.clone(),
254 reducer: self.reducer.clone(),
255 })
256 }
257
258 fn reserve(&mut self, additional: usize) {
259 self.values.reserve(additional);
260 }
261
262 fn resize(&mut self, num_groups: IdxSize) {
263 self.values.resize(num_groups as usize, self.reducer.init());
264 }
265
266 fn update_group(
267 &mut self,
268 values: &[&Column],
269 group_idx: IdxSize,
270 seq_id: u64,
271 ) -> PolarsResult<()> {
272 assert!(values.len() == 1);
273 let values = values[0];
274 assert!(values.dtype() == &self.in_dtype);
275 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
278 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
279 self.reducer
280 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
281 Ok(())
282 }
283
284 unsafe fn update_groups_while_evicting(
285 &mut self,
286 values: &[&Column],
287 subset: &[IdxSize],
288 group_idxs: &[EvictIdx],
289 seq_id: u64,
290 ) -> PolarsResult<()> {
291 let &[values] = values else { unreachable!() };
292 assert!(values.dtype() == &self.in_dtype);
293 assert!(subset.len() == group_idxs.len());
294 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
297 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
298 let arr = ca.downcast_as_array();
299 unsafe {
300 if values.has_nulls() {
302 for (i, g) in subset.iter().zip(group_idxs) {
303 let ov = arr.get_unchecked(*i as usize);
304 let grp = self.values.get_unchecked_mut(g.idx());
305 if g.should_evict() {
306 let old = core::mem::replace(grp, self.reducer.init());
307 self.evicted_values.push(old);
308 }
309 self.reducer.reduce_one(grp, ov, seq_id);
310 }
311 } else {
312 for (i, g) in subset.iter().zip(group_idxs) {
313 let v = arr.value_unchecked(*i as usize);
314 let grp = self.values.get_unchecked_mut(g.idx());
315 if g.should_evict() {
316 let old = core::mem::replace(grp, self.reducer.init());
317 self.evicted_values.push(old);
318 }
319 self.reducer.reduce_one(grp, Some(v), seq_id);
320 }
321 }
322 }
323 Ok(())
324 }
325
326 unsafe fn combine_subset(
327 &mut self,
328 other: &dyn GroupedReduction,
329 subset: &[IdxSize],
330 group_idxs: &[IdxSize],
331 ) -> PolarsResult<()> {
332 let other = other.as_any().downcast_ref::<Self>().unwrap();
333 assert!(self.in_dtype == other.in_dtype);
334 assert!(subset.len() == group_idxs.len());
335 unsafe {
336 for (i, g) in subset.iter().zip(group_idxs) {
338 let v = other.values.get_unchecked(*i as usize);
339 let grp = self.values.get_unchecked_mut(*g as usize);
340 self.reducer.combine(grp, v);
341 }
342 }
343 Ok(())
344 }
345
346 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
347 Box::new(Self {
348 values: core::mem::take(&mut self.evicted_values),
349 evicted_values: Vec::new(),
350 in_dtype: self.in_dtype.clone(),
351 reducer: self.reducer.clone(),
352 })
353 }
354
355 fn finalize(&mut self) -> PolarsResult<Series> {
356 let v = core::mem::take(&mut self.values);
357 self.reducer.finish(v, None, &self.in_dtype)
358 }
359
360 fn as_any(&self) -> &dyn Any {
361 self
362 }
363}
364
365pub struct VecMaskGroupedReduction<R: Reducer> {
366 values: Vec<R::Value>,
367 mask: MutableBitmap,
368 evicted_values: Vec<R::Value>,
369 evicted_mask: BitmapBuilder,
370 in_dtype: DataType,
371 reducer: R,
372}
373
374impl<R: Reducer> VecMaskGroupedReduction<R> {
375 fn new(in_dtype: DataType, reducer: R) -> Self {
376 Self {
377 values: Vec::new(),
378 mask: MutableBitmap::new(),
379 evicted_values: Vec::new(),
380 evicted_mask: BitmapBuilder::new(),
381 in_dtype,
382 reducer,
383 }
384 }
385}
386
387impl<R> GroupedReduction for VecMaskGroupedReduction<R>
388where
389 R: Reducer,
390{
391 fn new_empty(&self) -> Box<dyn GroupedReduction> {
392 Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
393 }
394
395 fn reserve(&mut self, additional: usize) {
396 self.values.reserve(additional);
397 self.mask.reserve(additional)
398 }
399
400 fn resize(&mut self, num_groups: IdxSize) {
401 self.values.resize(num_groups as usize, self.reducer.init());
402 self.mask.resize(num_groups as usize, false);
403 }
404
405 fn update_group(
406 &mut self,
407 values: &[&Column],
408 group_idx: IdxSize,
409 seq_id: u64,
410 ) -> PolarsResult<()> {
411 let &[values] = values else { unreachable!() };
412 assert!(values.dtype() == &self.in_dtype);
413 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
416 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
417 self.reducer
418 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
419 if ca.len() != ca.null_count() {
420 self.mask.set(group_idx as usize, true);
421 }
422 Ok(())
423 }
424
425 unsafe fn update_groups_while_evicting(
426 &mut self,
427 values: &[&Column],
428 subset: &[IdxSize],
429 group_idxs: &[EvictIdx],
430 seq_id: u64,
431 ) -> PolarsResult<()> {
432 let &[values] = values else { unreachable!() };
433 assert!(values.dtype() == &self.in_dtype);
434 assert!(subset.len() == group_idxs.len());
435 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
438 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
439 let arr = ca.downcast_as_array();
440 unsafe {
441 for (i, g) in subset.iter().zip(group_idxs) {
443 let ov = arr.get_unchecked(*i as usize);
444 let grp = self.values.get_unchecked_mut(g.idx());
445 if g.should_evict() {
446 self.evicted_values
447 .push(core::mem::replace(grp, self.reducer.init()));
448 self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
449 self.mask.set_unchecked(g.idx(), false);
450 }
451 if let Some(v) = ov {
452 self.reducer.reduce_one(grp, Some(v), seq_id);
453 self.mask.set_unchecked(g.idx(), true);
454 }
455 }
456 }
457 Ok(())
458 }
459
460 unsafe fn combine_subset(
461 &mut self,
462 other: &dyn GroupedReduction,
463 subset: &[IdxSize],
464 group_idxs: &[IdxSize],
465 ) -> PolarsResult<()> {
466 let other = other.as_any().downcast_ref::<Self>().unwrap();
467 assert!(self.in_dtype == other.in_dtype);
468 assert!(subset.len() == group_idxs.len());
469 unsafe {
470 for (i, g) in subset.iter().zip(group_idxs) {
472 let o = other.mask.get_unchecked(*i as usize);
473 if o {
474 let v = other.values.get_unchecked(*i as usize);
475 let grp = self.values.get_unchecked_mut(*g as usize);
476 self.reducer.combine(grp, v);
477 self.mask.set_unchecked(*g as usize, true);
478 }
479 }
480 }
481 Ok(())
482 }
483
484 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
485 Box::new(Self {
486 values: core::mem::take(&mut self.evicted_values),
487 mask: core::mem::take(&mut self.evicted_mask).into_mut(),
488 evicted_values: Vec::new(),
489 evicted_mask: BitmapBuilder::new(),
490 in_dtype: self.in_dtype.clone(),
491 reducer: self.reducer.clone(),
492 })
493 }
494
495 fn finalize(&mut self) -> PolarsResult<Series> {
496 let v = core::mem::take(&mut self.values);
497 let m = core::mem::take(&mut self.mask);
498 self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
499 }
500
501 fn as_any(&self) -> &dyn Any {
502 self
503 }
504}
505
506#[derive(Clone)]
507pub struct NullGroupedReduction {
508 num_groups: IdxSize,
509 num_evictions: IdxSize,
510 output: Scalar,
511}
512
513impl NullGroupedReduction {
514 pub fn new(output: Scalar) -> Self {
515 Self {
516 num_groups: 0,
517 num_evictions: 0,
518 output,
519 }
520 }
521}
522
523impl GroupedReduction for NullGroupedReduction {
524 fn new_empty(&self) -> Box<dyn GroupedReduction> {
525 Box::new(Self::new(self.output.clone()))
526 }
527
528 fn reserve(&mut self, _additional: usize) {}
529
530 fn resize(&mut self, num_groups: IdxSize) {
531 self.num_groups = num_groups;
532 }
533
534 fn update_group(
535 &mut self,
536 values: &[&Column],
537 _group_idx: IdxSize,
538 _seq_id: u64,
539 ) -> PolarsResult<()> {
540 assert!(!values.is_empty());
541 assert!(values.iter().any(|v| v.dtype().is_null()));
542 Ok(())
543 }
544
545 unsafe fn update_groups_while_evicting(
546 &mut self,
547 values: &[&Column],
548 subset: &[IdxSize],
549 group_idxs: &[EvictIdx],
550 _seq_id: u64,
551 ) -> PolarsResult<()> {
552 assert!(!values.is_empty());
553 assert!(values.iter().any(|v| v.dtype().is_null()));
554 assert!(subset.len() == group_idxs.len());
555 for g in group_idxs {
556 self.num_evictions += g.should_evict() as IdxSize;
557 }
558 Ok(())
559 }
560
561 unsafe fn combine_subset(
562 &mut self,
563 other: &dyn GroupedReduction,
564 subset: &[IdxSize],
565 group_idxs: &[IdxSize],
566 ) -> PolarsResult<()> {
567 let _other = other.as_any().downcast_ref::<Self>().unwrap();
568 assert!(subset.len() == group_idxs.len());
569 Ok(())
570 }
571
572 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
573 Box::new(Self {
574 num_groups: core::mem::replace(&mut self.num_evictions, 0),
575 num_evictions: 0,
576 output: self.output.clone(),
577 })
578 }
579
580 fn finalize(&mut self) -> PolarsResult<Series> {
581 let length = core::mem::replace(&mut self.num_groups, 0) as usize;
582 let s = self.output.clone().into_series(PlSmallStr::EMPTY);
583 Ok(s.new_from_index(0, length))
584 }
585
586 fn as_any(&self) -> &dyn Any {
587 self
588 }
589}