1mod convert;
2mod first_last;
3mod len;
4mod mean;
5mod min_max;
6mod partition;
7mod sum;
8mod var_std;
9
10use std::any::Any;
11use std::borrow::Cow;
12use std::marker::PhantomData;
13
14use arrow::array::{Array, PrimitiveArray, StaticArray};
15use arrow::bitmap::{Bitmap, MutableBitmap};
16pub use convert::into_reduction;
17use polars_core::prelude::*;
18
19pub trait GroupedReduction: Any + Send + Sync {
23 fn new_empty(&self) -> Box<dyn GroupedReduction>;
25
26 fn reserve(&mut self, additional: usize);
28
29 fn resize(&mut self, num_groups: IdxSize);
34
35 fn update_group(
40 &mut self,
41 values: &Series,
42 group_idx: IdxSize,
43 seq_id: u64,
44 ) -> PolarsResult<()>;
45
46 unsafe fn update_groups(
54 &mut self,
55 values: &Series,
56 group_idxs: &[IdxSize],
57 seq_id: u64,
58 ) -> PolarsResult<()>;
59
60 unsafe fn combine(
66 &mut self,
67 other: &dyn GroupedReduction,
68 group_idxs: &[IdxSize],
69 ) -> PolarsResult<()>;
70
71 unsafe fn gather_combine(
78 &mut self,
79 other: &dyn GroupedReduction,
80 subset: &[IdxSize],
81 group_idxs: &[IdxSize],
82 ) -> PolarsResult<()>;
83
84 unsafe fn partition(
95 self: Box<Self>,
96 partition_sizes: &[IdxSize],
97 partition_idxs: &[IdxSize],
98 ) -> Vec<Box<dyn GroupedReduction>>;
99
100 fn finalize(&mut self) -> PolarsResult<Series>;
104
105 fn as_any(&self) -> &dyn Any;
107}
108
109pub trait Reducer: Send + Sync + Clone + 'static {
112 type Dtype: PolarsDataType<IsLogical = FalseT>;
113 type Value: Clone + Send + Sync + 'static;
114 fn init(&self) -> Self::Value;
115 #[inline(always)]
116 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
117 Cow::Borrowed(s)
118 }
119 fn combine(&self, a: &mut Self::Value, b: &Self::Value);
120 fn reduce_one(
121 &self,
122 a: &mut Self::Value,
123 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
124 seq_id: u64,
125 );
126 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
127 fn finish(
128 &self,
129 v: Vec<Self::Value>,
130 m: Option<Bitmap>,
131 dtype: &DataType,
132 ) -> PolarsResult<Series>;
133}
134
135pub trait NumericReduction: Send + Sync + 'static {
136 type Dtype: PolarsNumericType;
137 fn init() -> <Self::Dtype as PolarsNumericType>::Native;
138 fn combine(
139 a: <Self::Dtype as PolarsNumericType>::Native,
140 b: <Self::Dtype as PolarsNumericType>::Native,
141 ) -> <Self::Dtype as PolarsNumericType>::Native;
142 fn reduce_ca(
143 ca: &ChunkedArray<Self::Dtype>,
144 ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
145}
146
147struct NumReducer<R: NumericReduction>(PhantomData<R>);
148impl<R: NumericReduction> NumReducer<R> {
149 fn new() -> Self {
150 Self(PhantomData)
151 }
152}
153impl<R: NumericReduction> Clone for NumReducer<R> {
154 fn clone(&self) -> Self {
155 Self(PhantomData)
156 }
157}
158
159impl<R: NumericReduction> Reducer for NumReducer<R> {
160 type Dtype = <R as NumericReduction>::Dtype;
161 type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
162
163 #[inline(always)]
164 fn init(&self) -> Self::Value {
165 <R as NumericReduction>::init()
166 }
167
168 #[inline(always)]
169 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
170 s.to_physical_repr()
171 }
172
173 #[inline(always)]
174 fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
175 *a = <R as NumericReduction>::combine(*a, *b);
176 }
177
178 #[inline(always)]
179 fn reduce_one(
180 &self,
181 a: &mut Self::Value,
182 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
183 _seq_id: u64,
184 ) {
185 if let Some(b) = b {
186 *a = <R as NumericReduction>::combine(*a, b);
187 }
188 }
189
190 #[inline(always)]
191 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
192 if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
193 *v = <R as NumericReduction>::combine(*v, r);
194 }
195 }
196
197 fn finish(
198 &self,
199 v: Vec<Self::Value>,
200 m: Option<Bitmap>,
201 dtype: &DataType,
202 ) -> PolarsResult<Series> {
203 let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
204 Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
205 }
206}
207
208pub struct VecGroupedReduction<R: Reducer> {
209 values: Vec<R::Value>,
210 in_dtype: DataType,
211 reducer: R,
212}
213
214impl<R: Reducer> VecGroupedReduction<R> {
215 fn new(in_dtype: DataType, reducer: R) -> Self {
216 Self {
217 values: Vec::new(),
218 in_dtype,
219 reducer,
220 }
221 }
222}
223
224impl<R> GroupedReduction for VecGroupedReduction<R>
225where
226 R: Reducer,
227{
228 fn new_empty(&self) -> Box<dyn GroupedReduction> {
229 Box::new(Self {
230 values: Vec::new(),
231 in_dtype: self.in_dtype.clone(),
232 reducer: self.reducer.clone(),
233 })
234 }
235
236 fn reserve(&mut self, additional: usize) {
237 self.values.reserve(additional);
238 }
239
240 fn resize(&mut self, num_groups: IdxSize) {
241 self.values.resize(num_groups as usize, self.reducer.init());
242 }
243
244 fn update_group(
245 &mut self,
246 values: &Series,
247 group_idx: IdxSize,
248 seq_id: u64,
249 ) -> PolarsResult<()> {
250 assert!(values.dtype() == &self.in_dtype);
251 let seq_id = seq_id + 1; let values = self.reducer.cast_series(values);
253 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
254 self.reducer
255 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
256 Ok(())
257 }
258
259 unsafe fn update_groups(
260 &mut self,
261 values: &Series,
262 group_idxs: &[IdxSize],
263 seq_id: u64,
264 ) -> PolarsResult<()> {
265 assert!(values.dtype() == &self.in_dtype);
266 assert!(values.len() == group_idxs.len());
267 let seq_id = seq_id + 1; let values = self.reducer.cast_series(values);
269 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
270 unsafe {
271 if values.has_nulls() {
273 for (g, ov) in group_idxs.iter().zip(ca.iter()) {
274 let grp = self.values.get_unchecked_mut(*g as usize);
275 self.reducer.reduce_one(grp, ov, seq_id);
276 }
277 } else {
278 let mut offset = 0;
279 for arr in ca.downcast_iter() {
280 let subgroup = &group_idxs[offset..offset + arr.len()];
281 for (g, v) in subgroup.iter().zip(arr.values_iter()) {
282 let grp = self.values.get_unchecked_mut(*g as usize);
283 self.reducer.reduce_one(grp, Some(v), seq_id);
284 }
285 offset += arr.len();
286 }
287 }
288 }
289 Ok(())
290 }
291
292 unsafe fn combine(
293 &mut self,
294 other: &dyn GroupedReduction,
295 group_idxs: &[IdxSize],
296 ) -> PolarsResult<()> {
297 let other = other.as_any().downcast_ref::<Self>().unwrap();
298 assert!(self.in_dtype == other.in_dtype);
299 assert!(group_idxs.len() == other.values.len());
300 unsafe {
301 for (g, v) in group_idxs.iter().zip(other.values.iter()) {
303 let grp = self.values.get_unchecked_mut(*g as usize);
304 self.reducer.combine(grp, v);
305 }
306 }
307 Ok(())
308 }
309
310 unsafe fn gather_combine(
311 &mut self,
312 other: &dyn GroupedReduction,
313 subset: &[IdxSize],
314 group_idxs: &[IdxSize],
315 ) -> PolarsResult<()> {
316 let other = other.as_any().downcast_ref::<Self>().unwrap();
317 assert!(self.in_dtype == other.in_dtype);
318 assert!(subset.len() == group_idxs.len());
319 unsafe {
320 for (i, g) in subset.iter().zip(group_idxs) {
322 let v = other.values.get_unchecked(*i as usize);
323 let grp = self.values.get_unchecked_mut(*g as usize);
324 self.reducer.combine(grp, v);
325 }
326 }
327 Ok(())
328 }
329
330 unsafe fn partition(
331 self: Box<Self>,
332 partition_sizes: &[IdxSize],
333 partition_idxs: &[IdxSize],
334 ) -> Vec<Box<dyn GroupedReduction>> {
335 partition::partition_vec(self.values, partition_sizes, partition_idxs)
336 .into_iter()
337 .map(|values| {
338 Box::new(Self {
339 values,
340 in_dtype: self.in_dtype.clone(),
341 reducer: self.reducer.clone(),
342 }) as _
343 })
344 .collect()
345 }
346
347 fn finalize(&mut self) -> PolarsResult<Series> {
348 let v = core::mem::take(&mut self.values);
349 self.reducer.finish(v, None, &self.in_dtype)
350 }
351
352 fn as_any(&self) -> &dyn Any {
353 self
354 }
355}
356
357pub struct VecMaskGroupedReduction<R: Reducer> {
358 values: Vec<R::Value>,
359 mask: MutableBitmap,
360 in_dtype: DataType,
361 reducer: R,
362}
363
364impl<R: Reducer> VecMaskGroupedReduction<R> {
365 fn new(in_dtype: DataType, reducer: R) -> Self {
366 Self {
367 values: Vec::new(),
368 mask: MutableBitmap::new(),
369 in_dtype,
370 reducer,
371 }
372 }
373}
374
375impl<R> GroupedReduction for VecMaskGroupedReduction<R>
376where
377 R: Reducer,
378{
379 fn new_empty(&self) -> Box<dyn GroupedReduction> {
380 Box::new(Self {
381 values: Vec::new(),
382 mask: MutableBitmap::new(),
383 in_dtype: self.in_dtype.clone(),
384 reducer: self.reducer.clone(),
385 })
386 }
387
388 fn reserve(&mut self, additional: usize) {
389 self.values.reserve(additional);
390 self.mask.reserve(additional)
391 }
392
393 fn resize(&mut self, num_groups: IdxSize) {
394 self.values.resize(num_groups as usize, self.reducer.init());
395 self.mask.resize(num_groups as usize, false);
396 }
397
398 fn update_group(
399 &mut self,
400 values: &Series,
401 group_idx: IdxSize,
402 seq_id: u64,
403 ) -> PolarsResult<()> {
404 assert!(values.dtype() == &self.in_dtype);
407 let seq_id = seq_id + 1; let values = values.to_physical_repr();
409 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
410 self.reducer
411 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
412 if ca.len() != ca.null_count() {
413 self.mask.set(group_idx as usize, true);
414 }
415 Ok(())
416 }
417
418 unsafe fn update_groups(
419 &mut self,
420 values: &Series,
421 group_idxs: &[IdxSize],
422 seq_id: u64,
423 ) -> PolarsResult<()> {
424 assert!(values.dtype() == &self.in_dtype);
427 assert!(values.len() == group_idxs.len());
428 let seq_id = seq_id + 1; let values = values.to_physical_repr();
430 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
431 unsafe {
432 for (g, ov) in group_idxs.iter().zip(ca.iter()) {
434 if let Some(v) = ov {
435 let grp = self.values.get_unchecked_mut(*g as usize);
436 self.reducer.reduce_one(grp, Some(v), seq_id);
437 self.mask.set_unchecked(*g as usize, true);
438 }
439 }
440 }
441 Ok(())
442 }
443
444 unsafe fn combine(
445 &mut self,
446 other: &dyn GroupedReduction,
447 group_idxs: &[IdxSize],
448 ) -> PolarsResult<()> {
449 let other = other.as_any().downcast_ref::<Self>().unwrap();
450 assert!(self.in_dtype == other.in_dtype);
451 assert!(group_idxs.len() == other.values.len());
452 unsafe {
453 for (g, (v, o)) in group_idxs
455 .iter()
456 .zip(other.values.iter().zip(other.mask.iter()))
457 {
458 if o {
459 let grp = self.values.get_unchecked_mut(*g as usize);
460 self.reducer.combine(grp, v);
461 self.mask.set_unchecked(*g as usize, true);
462 }
463 }
464 }
465 Ok(())
466 }
467
468 unsafe fn gather_combine(
469 &mut self,
470 other: &dyn GroupedReduction,
471 subset: &[IdxSize],
472 group_idxs: &[IdxSize],
473 ) -> PolarsResult<()> {
474 let other = other.as_any().downcast_ref::<Self>().unwrap();
475 assert!(self.in_dtype == other.in_dtype);
476 assert!(subset.len() == group_idxs.len());
477 unsafe {
478 for (i, g) in subset.iter().zip(group_idxs) {
480 let o = other.mask.get_unchecked(*i as usize);
481 if o {
482 let v = other.values.get_unchecked(*i as usize);
483 let grp = self.values.get_unchecked_mut(*g as usize);
484 self.reducer.combine(grp, v);
485 self.mask.set_unchecked(*g as usize, true);
486 }
487 }
488 }
489 Ok(())
490 }
491
492 unsafe fn partition(
493 self: Box<Self>,
494 partition_sizes: &[IdxSize],
495 partition_idxs: &[IdxSize],
496 ) -> Vec<Box<dyn GroupedReduction>> {
497 partition::partition_vec_mask(
498 self.values,
499 &self.mask.freeze(),
500 partition_sizes,
501 partition_idxs,
502 )
503 .into_iter()
504 .map(|(values, mask)| {
505 Box::new(Self {
506 values,
507 mask: mask.into_mut(),
508 in_dtype: self.in_dtype.clone(),
509 reducer: self.reducer.clone(),
510 }) as _
511 })
512 .collect()
513 }
514
515 fn finalize(&mut self) -> PolarsResult<Series> {
516 let v = core::mem::take(&mut self.values);
517 let m = core::mem::take(&mut self.mask);
518 self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
519 }
520
521 fn as_any(&self) -> &dyn Any {
522 self
523 }
524}
525
526struct NullGroupedReduction {
527 dtype: DataType,
528 num_groups: IdxSize,
529}
530
531impl NullGroupedReduction {
532 fn new(dtype: DataType) -> Self {
533 Self {
534 dtype,
535 num_groups: 0,
536 }
537 }
538}
539
540impl GroupedReduction for NullGroupedReduction {
541 fn new_empty(&self) -> Box<dyn GroupedReduction> {
542 Box::new(Self {
543 dtype: self.dtype.clone(),
544 num_groups: self.num_groups,
545 })
546 }
547
548 fn reserve(&mut self, _additional: usize) {}
549
550 fn resize(&mut self, num_groups: IdxSize) {
551 self.num_groups = num_groups;
552 }
553
554 fn update_group(
555 &mut self,
556 _values: &Series,
557 _group_idx: IdxSize,
558 _seq_id: u64,
559 ) -> PolarsResult<()> {
560 Ok(())
561 }
562
563 unsafe fn update_groups(
564 &mut self,
565 _values: &Series,
566 _group_idxs: &[IdxSize],
567 _seq_id: u64,
568 ) -> PolarsResult<()> {
569 Ok(())
570 }
571
572 unsafe fn combine(
573 &mut self,
574 _other: &dyn GroupedReduction,
575 _group_idxs: &[IdxSize],
576 ) -> PolarsResult<()> {
577 Ok(())
578 }
579
580 unsafe fn gather_combine(
581 &mut self,
582 _other: &dyn GroupedReduction,
583 _subset: &[IdxSize],
584 _group_idxs: &[IdxSize],
585 ) -> PolarsResult<()> {
586 Ok(())
587 }
588
589 unsafe fn partition(
590 self: Box<Self>,
591 partition_sizes: &[IdxSize],
592 _partition_idxs: &[IdxSize],
593 ) -> Vec<Box<dyn GroupedReduction>> {
594 partition_sizes
595 .iter()
596 .map(|&num_groups| {
597 Box::new(Self {
598 dtype: self.dtype.clone(),
599 num_groups,
600 }) as _
601 })
602 .collect()
603 }
604
605 fn finalize(&mut self) -> PolarsResult<Series> {
606 let num_groups = self.num_groups;
607 self.num_groups = 0;
608 Ok(Series::full_null(
609 PlSmallStr::EMPTY,
610 num_groups as usize,
611 &self.dtype,
612 ))
613 }
614
615 fn as_any(&self) -> &dyn Any {
616 self
617 }
618}