polars_expr/reduce/
mod.rs

1mod convert;
2mod first_last;
3mod len;
4mod mean;
5mod min_max;
6mod partition;
7mod sum;
8mod var_std;
9
10use std::any::Any;
11use std::borrow::Cow;
12use std::marker::PhantomData;
13
14use arrow::array::{Array, PrimitiveArray, StaticArray};
15use arrow::bitmap::{Bitmap, MutableBitmap};
16pub use convert::into_reduction;
17use polars_core::prelude::*;
18
19/// A reduction with groups.
20///
21/// Each group has its own reduction state that values can be aggregated into.
22pub trait GroupedReduction: Any + Send + Sync {
23    /// Returns a new empty reduction.
24    fn new_empty(&self) -> Box<dyn GroupedReduction>;
25
26    /// Reserves space in this GroupedReduction for an additional number of groups.
27    fn reserve(&mut self, additional: usize);
28
29    /// Resizes this GroupedReduction to the given number of groups.
30    ///
31    /// While not an actual member of the trait, the safety preconditions below
32    /// refer to self.num_groups() as given by the last call of this function.
33    fn resize(&mut self, num_groups: IdxSize);
34
35    /// Updates the specified group with the given values.
36    ///
37    /// For order-sensitive grouped reductions, seq_id can be used to resolve
38    /// order between calls/multiple reductions.
39    fn update_group(
40        &mut self,
41        values: &Series,
42        group_idx: IdxSize,
43        seq_id: u64,
44    ) -> PolarsResult<()>;
45
46    /// Updates this GroupedReduction with new values. values[i] should
47    /// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
48    /// reductions, seq_id can be used to resolve order between calls/multiple
49    /// reductions.
50    ///
51    /// # Safety
52    /// group_idxs[i] < self.num_groups() for all i.
53    unsafe fn update_groups(
54        &mut self,
55        values: &Series,
56        group_idxs: &[IdxSize],
57        seq_id: u64,
58    ) -> PolarsResult<()>;
59
60    /// Combines this GroupedReduction with another. Group other[i]
61    /// should be combined into group self[group_idxs[i]].
62    ///
63    /// # Safety
64    /// group_idxs[i] < self.num_groups() for all i.
65    unsafe fn combine(
66        &mut self,
67        other: &dyn GroupedReduction,
68        group_idxs: &[IdxSize],
69    ) -> PolarsResult<()>;
70
71    /// Combines this GroupedReduction with another. Group other[subset[i]]
72    /// should be combined into group self[group_idxs[i]].
73    ///
74    /// # Safety
75    /// subset[i] < other.num_groups() for all i.
76    /// group_idxs[i] < self.num_groups() for all i.
77    unsafe fn gather_combine(
78        &mut self,
79        other: &dyn GroupedReduction,
80        subset: &[IdxSize],
81        group_idxs: &[IdxSize],
82    ) -> PolarsResult<()>;
83
84    /// Partitions this GroupedReduction into several partitions.
85    ///
86    /// The ith group of this GroupedReduction should becomes the group_idxs[i]
87    /// group in partition partition_idxs[i].
88    ///
89    /// # Safety
90    /// partitions_idxs[i] < partition_sizes.len() for all i.
91    /// group_idxs[i] < partition_sizes[partition_idxs[i]] for all i.
92    /// Each partition p has an associated set of group_idxs, this set contains
93    /// 0..partition_size[p] exactly once.
94    unsafe fn partition(
95        self: Box<Self>,
96        partition_sizes: &[IdxSize],
97        partition_idxs: &[IdxSize],
98    ) -> Vec<Box<dyn GroupedReduction>>;
99
100    /// Returns the finalized value per group as a Series.
101    ///
102    /// After this operation the number of groups is reset to 0.
103    fn finalize(&mut self) -> PolarsResult<Series>;
104
105    /// Returns this GroupedReduction as a dyn Any.
106    fn as_any(&self) -> &dyn Any;
107}
108
109// Helper traits used in the VecGroupedReduction and VecMaskGroupedReduction to
110// reduce code duplication.
111pub trait Reducer: Send + Sync + Clone + 'static {
112    type Dtype: PolarsDataType<IsLogical = FalseT>;
113    type Value: Clone + Send + Sync + 'static;
114    fn init(&self) -> Self::Value;
115    #[inline(always)]
116    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
117        Cow::Borrowed(s)
118    }
119    fn combine(&self, a: &mut Self::Value, b: &Self::Value);
120    fn reduce_one(
121        &self,
122        a: &mut Self::Value,
123        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
124        seq_id: u64,
125    );
126    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
127    fn finish(
128        &self,
129        v: Vec<Self::Value>,
130        m: Option<Bitmap>,
131        dtype: &DataType,
132    ) -> PolarsResult<Series>;
133}
134
135pub trait NumericReduction: Send + Sync + 'static {
136    type Dtype: PolarsNumericType;
137    fn init() -> <Self::Dtype as PolarsNumericType>::Native;
138    fn combine(
139        a: <Self::Dtype as PolarsNumericType>::Native,
140        b: <Self::Dtype as PolarsNumericType>::Native,
141    ) -> <Self::Dtype as PolarsNumericType>::Native;
142    fn reduce_ca(
143        ca: &ChunkedArray<Self::Dtype>,
144    ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
145}
146
147struct NumReducer<R: NumericReduction>(PhantomData<R>);
148impl<R: NumericReduction> NumReducer<R> {
149    fn new() -> Self {
150        Self(PhantomData)
151    }
152}
153impl<R: NumericReduction> Clone for NumReducer<R> {
154    fn clone(&self) -> Self {
155        Self(PhantomData)
156    }
157}
158
159impl<R: NumericReduction> Reducer for NumReducer<R> {
160    type Dtype = <R as NumericReduction>::Dtype;
161    type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
162
163    #[inline(always)]
164    fn init(&self) -> Self::Value {
165        <R as NumericReduction>::init()
166    }
167
168    #[inline(always)]
169    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
170        s.to_physical_repr()
171    }
172
173    #[inline(always)]
174    fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
175        *a = <R as NumericReduction>::combine(*a, *b);
176    }
177
178    #[inline(always)]
179    fn reduce_one(
180        &self,
181        a: &mut Self::Value,
182        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
183        _seq_id: u64,
184    ) {
185        if let Some(b) = b {
186            *a = <R as NumericReduction>::combine(*a, b);
187        }
188    }
189
190    #[inline(always)]
191    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
192        if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
193            *v = <R as NumericReduction>::combine(*v, r);
194        }
195    }
196
197    fn finish(
198        &self,
199        v: Vec<Self::Value>,
200        m: Option<Bitmap>,
201        dtype: &DataType,
202    ) -> PolarsResult<Series> {
203        let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
204        Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
205    }
206}
207
208pub struct VecGroupedReduction<R: Reducer> {
209    values: Vec<R::Value>,
210    in_dtype: DataType,
211    reducer: R,
212}
213
214impl<R: Reducer> VecGroupedReduction<R> {
215    fn new(in_dtype: DataType, reducer: R) -> Self {
216        Self {
217            values: Vec::new(),
218            in_dtype,
219            reducer,
220        }
221    }
222}
223
224impl<R> GroupedReduction for VecGroupedReduction<R>
225where
226    R: Reducer,
227{
228    fn new_empty(&self) -> Box<dyn GroupedReduction> {
229        Box::new(Self {
230            values: Vec::new(),
231            in_dtype: self.in_dtype.clone(),
232            reducer: self.reducer.clone(),
233        })
234    }
235
236    fn reserve(&mut self, additional: usize) {
237        self.values.reserve(additional);
238    }
239
240    fn resize(&mut self, num_groups: IdxSize) {
241        self.values.resize(num_groups as usize, self.reducer.init());
242    }
243
244    fn update_group(
245        &mut self,
246        values: &Series,
247        group_idx: IdxSize,
248        seq_id: u64,
249    ) -> PolarsResult<()> {
250        assert!(values.dtype() == &self.in_dtype);
251        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
252        let values = self.reducer.cast_series(values);
253        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
254        self.reducer
255            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
256        Ok(())
257    }
258
259    unsafe fn update_groups(
260        &mut self,
261        values: &Series,
262        group_idxs: &[IdxSize],
263        seq_id: u64,
264    ) -> PolarsResult<()> {
265        assert!(values.dtype() == &self.in_dtype);
266        assert!(values.len() == group_idxs.len());
267        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
268        let values = self.reducer.cast_series(values);
269        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
270        unsafe {
271            // SAFETY: indices are in-bounds guaranteed by trait.
272            if values.has_nulls() {
273                for (g, ov) in group_idxs.iter().zip(ca.iter()) {
274                    let grp = self.values.get_unchecked_mut(*g as usize);
275                    self.reducer.reduce_one(grp, ov, seq_id);
276                }
277            } else {
278                let mut offset = 0;
279                for arr in ca.downcast_iter() {
280                    let subgroup = &group_idxs[offset..offset + arr.len()];
281                    for (g, v) in subgroup.iter().zip(arr.values_iter()) {
282                        let grp = self.values.get_unchecked_mut(*g as usize);
283                        self.reducer.reduce_one(grp, Some(v), seq_id);
284                    }
285                    offset += arr.len();
286                }
287            }
288        }
289        Ok(())
290    }
291
292    unsafe fn combine(
293        &mut self,
294        other: &dyn GroupedReduction,
295        group_idxs: &[IdxSize],
296    ) -> PolarsResult<()> {
297        let other = other.as_any().downcast_ref::<Self>().unwrap();
298        assert!(self.in_dtype == other.in_dtype);
299        assert!(group_idxs.len() == other.values.len());
300        unsafe {
301            // SAFETY: indices are in-bounds guaranteed by trait.
302            for (g, v) in group_idxs.iter().zip(other.values.iter()) {
303                let grp = self.values.get_unchecked_mut(*g as usize);
304                self.reducer.combine(grp, v);
305            }
306        }
307        Ok(())
308    }
309
310    unsafe fn gather_combine(
311        &mut self,
312        other: &dyn GroupedReduction,
313        subset: &[IdxSize],
314        group_idxs: &[IdxSize],
315    ) -> PolarsResult<()> {
316        let other = other.as_any().downcast_ref::<Self>().unwrap();
317        assert!(self.in_dtype == other.in_dtype);
318        assert!(subset.len() == group_idxs.len());
319        unsafe {
320            // SAFETY: indices are in-bounds guaranteed by trait.
321            for (i, g) in subset.iter().zip(group_idxs) {
322                let v = other.values.get_unchecked(*i as usize);
323                let grp = self.values.get_unchecked_mut(*g as usize);
324                self.reducer.combine(grp, v);
325            }
326        }
327        Ok(())
328    }
329
330    unsafe fn partition(
331        self: Box<Self>,
332        partition_sizes: &[IdxSize],
333        partition_idxs: &[IdxSize],
334    ) -> Vec<Box<dyn GroupedReduction>> {
335        partition::partition_vec(self.values, partition_sizes, partition_idxs)
336            .into_iter()
337            .map(|values| {
338                Box::new(Self {
339                    values,
340                    in_dtype: self.in_dtype.clone(),
341                    reducer: self.reducer.clone(),
342                }) as _
343            })
344            .collect()
345    }
346
347    fn finalize(&mut self) -> PolarsResult<Series> {
348        let v = core::mem::take(&mut self.values);
349        self.reducer.finish(v, None, &self.in_dtype)
350    }
351
352    fn as_any(&self) -> &dyn Any {
353        self
354    }
355}
356
357pub struct VecMaskGroupedReduction<R: Reducer> {
358    values: Vec<R::Value>,
359    mask: MutableBitmap,
360    in_dtype: DataType,
361    reducer: R,
362}
363
364impl<R: Reducer> VecMaskGroupedReduction<R> {
365    fn new(in_dtype: DataType, reducer: R) -> Self {
366        Self {
367            values: Vec::new(),
368            mask: MutableBitmap::new(),
369            in_dtype,
370            reducer,
371        }
372    }
373}
374
375impl<R> GroupedReduction for VecMaskGroupedReduction<R>
376where
377    R: Reducer,
378{
379    fn new_empty(&self) -> Box<dyn GroupedReduction> {
380        Box::new(Self {
381            values: Vec::new(),
382            mask: MutableBitmap::new(),
383            in_dtype: self.in_dtype.clone(),
384            reducer: self.reducer.clone(),
385        })
386    }
387
388    fn reserve(&mut self, additional: usize) {
389        self.values.reserve(additional);
390        self.mask.reserve(additional)
391    }
392
393    fn resize(&mut self, num_groups: IdxSize) {
394        self.values.resize(num_groups as usize, self.reducer.init());
395        self.mask.resize(num_groups as usize, false);
396    }
397
398    fn update_group(
399        &mut self,
400        values: &Series,
401        group_idx: IdxSize,
402        seq_id: u64,
403    ) -> PolarsResult<()> {
404        // TODO: we should really implement a sum-as-other-type operation instead
405        // of doing this materialized cast.
406        assert!(values.dtype() == &self.in_dtype);
407        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
408        let values = values.to_physical_repr();
409        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
410        self.reducer
411            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
412        if ca.len() != ca.null_count() {
413            self.mask.set(group_idx as usize, true);
414        }
415        Ok(())
416    }
417
418    unsafe fn update_groups(
419        &mut self,
420        values: &Series,
421        group_idxs: &[IdxSize],
422        seq_id: u64,
423    ) -> PolarsResult<()> {
424        // TODO: we should really implement a sum-as-other-type operation instead
425        // of doing this materialized cast.
426        assert!(values.dtype() == &self.in_dtype);
427        assert!(values.len() == group_idxs.len());
428        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
429        let values = values.to_physical_repr();
430        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
431        unsafe {
432            // SAFETY: indices are in-bounds guaranteed by trait.
433            for (g, ov) in group_idxs.iter().zip(ca.iter()) {
434                if let Some(v) = ov {
435                    let grp = self.values.get_unchecked_mut(*g as usize);
436                    self.reducer.reduce_one(grp, Some(v), seq_id);
437                    self.mask.set_unchecked(*g as usize, true);
438                }
439            }
440        }
441        Ok(())
442    }
443
444    unsafe fn combine(
445        &mut self,
446        other: &dyn GroupedReduction,
447        group_idxs: &[IdxSize],
448    ) -> PolarsResult<()> {
449        let other = other.as_any().downcast_ref::<Self>().unwrap();
450        assert!(self.in_dtype == other.in_dtype);
451        assert!(group_idxs.len() == other.values.len());
452        unsafe {
453            // SAFETY: indices are in-bounds guaranteed by trait.
454            for (g, (v, o)) in group_idxs
455                .iter()
456                .zip(other.values.iter().zip(other.mask.iter()))
457            {
458                if o {
459                    let grp = self.values.get_unchecked_mut(*g as usize);
460                    self.reducer.combine(grp, v);
461                    self.mask.set_unchecked(*g as usize, true);
462                }
463            }
464        }
465        Ok(())
466    }
467
468    unsafe fn gather_combine(
469        &mut self,
470        other: &dyn GroupedReduction,
471        subset: &[IdxSize],
472        group_idxs: &[IdxSize],
473    ) -> PolarsResult<()> {
474        let other = other.as_any().downcast_ref::<Self>().unwrap();
475        assert!(self.in_dtype == other.in_dtype);
476        assert!(subset.len() == group_idxs.len());
477        unsafe {
478            // SAFETY: indices are in-bounds guaranteed by trait.
479            for (i, g) in subset.iter().zip(group_idxs) {
480                let o = other.mask.get_unchecked(*i as usize);
481                if o {
482                    let v = other.values.get_unchecked(*i as usize);
483                    let grp = self.values.get_unchecked_mut(*g as usize);
484                    self.reducer.combine(grp, v);
485                    self.mask.set_unchecked(*g as usize, true);
486                }
487            }
488        }
489        Ok(())
490    }
491
492    unsafe fn partition(
493        self: Box<Self>,
494        partition_sizes: &[IdxSize],
495        partition_idxs: &[IdxSize],
496    ) -> Vec<Box<dyn GroupedReduction>> {
497        partition::partition_vec_mask(
498            self.values,
499            &self.mask.freeze(),
500            partition_sizes,
501            partition_idxs,
502        )
503        .into_iter()
504        .map(|(values, mask)| {
505            Box::new(Self {
506                values,
507                mask: mask.into_mut(),
508                in_dtype: self.in_dtype.clone(),
509                reducer: self.reducer.clone(),
510            }) as _
511        })
512        .collect()
513    }
514
515    fn finalize(&mut self) -> PolarsResult<Series> {
516        let v = core::mem::take(&mut self.values);
517        let m = core::mem::take(&mut self.mask);
518        self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
519    }
520
521    fn as_any(&self) -> &dyn Any {
522        self
523    }
524}
525
526struct NullGroupedReduction {
527    dtype: DataType,
528    num_groups: IdxSize,
529}
530
531impl NullGroupedReduction {
532    fn new(dtype: DataType) -> Self {
533        Self {
534            dtype,
535            num_groups: 0,
536        }
537    }
538}
539
540impl GroupedReduction for NullGroupedReduction {
541    fn new_empty(&self) -> Box<dyn GroupedReduction> {
542        Box::new(Self {
543            dtype: self.dtype.clone(),
544            num_groups: self.num_groups,
545        })
546    }
547
548    fn reserve(&mut self, _additional: usize) {}
549
550    fn resize(&mut self, num_groups: IdxSize) {
551        self.num_groups = num_groups;
552    }
553
554    fn update_group(
555        &mut self,
556        _values: &Series,
557        _group_idx: IdxSize,
558        _seq_id: u64,
559    ) -> PolarsResult<()> {
560        Ok(())
561    }
562
563    unsafe fn update_groups(
564        &mut self,
565        _values: &Series,
566        _group_idxs: &[IdxSize],
567        _seq_id: u64,
568    ) -> PolarsResult<()> {
569        Ok(())
570    }
571
572    unsafe fn combine(
573        &mut self,
574        _other: &dyn GroupedReduction,
575        _group_idxs: &[IdxSize],
576    ) -> PolarsResult<()> {
577        Ok(())
578    }
579
580    unsafe fn gather_combine(
581        &mut self,
582        _other: &dyn GroupedReduction,
583        _subset: &[IdxSize],
584        _group_idxs: &[IdxSize],
585    ) -> PolarsResult<()> {
586        Ok(())
587    }
588
589    unsafe fn partition(
590        self: Box<Self>,
591        partition_sizes: &[IdxSize],
592        _partition_idxs: &[IdxSize],
593    ) -> Vec<Box<dyn GroupedReduction>> {
594        partition_sizes
595            .iter()
596            .map(|&num_groups| {
597                Box::new(Self {
598                    dtype: self.dtype.clone(),
599                    num_groups,
600                }) as _
601            })
602            .collect()
603    }
604
605    fn finalize(&mut self) -> PolarsResult<Series> {
606        let num_groups = self.num_groups;
607        self.num_groups = 0;
608        Ok(Series::full_null(
609            PlSmallStr::EMPTY,
610            num_groups as usize,
611            &self.dtype,
612        ))
613    }
614
615    fn as_any(&self) -> &dyn Any {
616        self
617    }
618}