Skip to main content

polars_expr/reduce/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2mod any_all;
3#[cfg(feature = "approx_unique")]
4mod approx_n_unique;
5#[cfg(feature = "bitwise")]
6mod bitwise;
7mod convert;
8mod count;
9#[cfg(feature = "cov")]
10mod cov;
11mod first_last;
12mod first_last_nonnull;
13mod has_nulls;
14mod implode;
15mod is_empty;
16mod mean;
17mod min_max;
18mod min_max_by;
19#[cfg(feature = "moment")]
20mod skew_kurtosis;
21mod sum;
22mod var_std;
23
24use std::any::Any;
25use std::borrow::Cow;
26use std::marker::PhantomData;
27
28use arrow::array::{Array, PrimitiveArray, StaticArray};
29use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
30pub use convert::into_reduction;
31pub use min_max::{new_max_reduction, new_min_reduction};
32use polars_core::prelude::*;
33
34use crate::EvictIdx;
35
36/// A reduction with groups.
37///
38/// Each group has its own reduction state that values can be aggregated into.
39pub trait GroupedReduction: Any + Send + Sync {
40    /// Returns a new empty reduction.
41    fn new_empty(&self) -> Box<dyn GroupedReduction>;
42
43    /// Reserves space in this GroupedReduction for an additional number of groups.
44    fn reserve(&mut self, additional: usize);
45
46    /// Resizes this GroupedReduction to the given number of groups.
47    ///
48    /// While not an actual member of the trait, the safety preconditions below
49    /// refer to self.num_groups() as given by the last call of this function.
50    fn resize(&mut self, num_groups: IdxSize);
51
52    /// Updates the specified group with the given values.
53    ///
54    /// For order-sensitive grouped reductions, seq_id can be used to resolve
55    /// order between calls/multiple reductions.
56    fn update_group(
57        &mut self,
58        values: &[&Column],
59        group_idx: IdxSize,
60        seq_id: u64,
61    ) -> PolarsResult<()>;
62
63    /// Updates this GroupedReduction with new values. values[subset[i]] should
64    /// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
65    /// reductions, seq_id can be used to resolve order between calls/multiple
66    /// reductions.
67    ///
68    /// The column MUST consist of single chunk.
69    ///
70    /// # Safety
71    /// The subset and group_idxs are in-bounds.
72    unsafe fn update_groups_subset(
73        &mut self,
74        values: &[&Column],
75        subset: &[IdxSize],
76        group_idxs: &[IdxSize],
77        seq_id: u64,
78    ) -> PolarsResult<()> {
79        assert!(values.len() < (1 << (IdxSize::BITS - 1)));
80        // SAFETY: EvictIdx is a wrapper for IdxSize and has same alignment.
81        let evict_group_idxs = EvictIdx::cast_slice(group_idxs);
82        self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
83    }
84
85    /// Updates this GroupedReduction with new values. values[subset[i]] should
86    /// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
87    /// reductions, seq_id can be used to resolve order between calls/multiple
88    /// reductions. If the group_idxs[i] has its evict bit set the current value
89    /// in the group should be evicted and reset before updating.
90    ///
91    /// The column MUST consist of single chunk.
92    ///
93    /// # Safety
94    /// The subset and group_idxs are in-bounds.
95    unsafe fn update_groups_while_evicting(
96        &mut self,
97        values: &[&Column],
98        subset: &[IdxSize],
99        group_idxs: &[EvictIdx],
100        seq_id: u64,
101    ) -> PolarsResult<()>;
102
103    /// Combines this GroupedReduction with another. Group other[subset[i]]
104    /// should be combined into group self[group_idxs[i]].
105    ///
106    /// # Safety
107    /// subset[i] < other.num_groups() for all i.
108    /// group_idxs[i] < self.num_groups() for all i.
109    unsafe fn combine_subset(
110        &mut self,
111        other: &dyn GroupedReduction,
112        subset: &[IdxSize],
113        group_idxs: &[IdxSize],
114    ) -> PolarsResult<()>;
115
116    /// Take the accumulated evicted groups.
117    fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
118
119    /// Returns the finalized value per group as a Series.
120    ///
121    /// After this operation the number of groups is reset to 0.
122    fn finalize(&mut self) -> PolarsResult<Series>;
123
124    /// Returns this GroupedReduction as a dyn Any.
125    fn as_any(&self) -> &dyn Any;
126}
127
128// Helper traits used in the VecGroupedReduction and VecMaskGroupedReduction to
129// reduce code duplication.
130pub trait Reducer: Send + Sync + Clone + 'static {
131    type Dtype: PolarsPhysicalType;
132    type Value: Clone + Send + Sync + 'static;
133    fn init(&self) -> Self::Value;
134    #[inline(always)]
135    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
136        Cow::Borrowed(s)
137    }
138    fn combine(&self, a: &mut Self::Value, b: &Self::Value);
139    fn reduce_one(
140        &self,
141        a: &mut Self::Value,
142        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
143        seq_id: u64,
144    );
145    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
146    fn finish(
147        &self,
148        v: Vec<Self::Value>,
149        m: Option<Bitmap>,
150        dtype: &DataType,
151    ) -> PolarsResult<Series>;
152}
153
154pub trait NumericReduction: Send + Sync + 'static {
155    type Dtype: PolarsNumericType;
156    fn init() -> <Self::Dtype as PolarsNumericType>::Native;
157    fn combine(
158        a: <Self::Dtype as PolarsNumericType>::Native,
159        b: <Self::Dtype as PolarsNumericType>::Native,
160    ) -> <Self::Dtype as PolarsNumericType>::Native;
161    fn reduce_ca(
162        ca: &ChunkedArray<Self::Dtype>,
163    ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
164}
165
166struct NumReducer<R: NumericReduction>(PhantomData<R>);
167impl<R: NumericReduction> NumReducer<R> {
168    fn new() -> Self {
169        Self(PhantomData)
170    }
171}
172impl<R: NumericReduction> Clone for NumReducer<R> {
173    fn clone(&self) -> Self {
174        Self(PhantomData)
175    }
176}
177
178impl<R: NumericReduction> Reducer for NumReducer<R> {
179    type Dtype = <R as NumericReduction>::Dtype;
180    type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
181
182    #[inline(always)]
183    fn init(&self) -> Self::Value {
184        <R as NumericReduction>::init()
185    }
186
187    #[inline(always)]
188    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
189        s.to_physical_repr()
190    }
191
192    #[inline(always)]
193    fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
194        *a = <R as NumericReduction>::combine(*a, *b);
195    }
196
197    #[inline(always)]
198    fn reduce_one(
199        &self,
200        a: &mut Self::Value,
201        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
202        _seq_id: u64,
203    ) {
204        if let Some(b) = b {
205            *a = <R as NumericReduction>::combine(*a, b);
206        }
207    }
208
209    #[inline(always)]
210    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
211        if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
212            *v = <R as NumericReduction>::combine(*v, r);
213        }
214    }
215
216    fn finish(
217        &self,
218        v: Vec<Self::Value>,
219        m: Option<Bitmap>,
220        dtype: &DataType,
221    ) -> PolarsResult<Series> {
222        let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
223        Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
224    }
225}
226
227pub struct VecGroupedReduction<R: Reducer> {
228    values: Vec<R::Value>,
229    evicted_values: Vec<R::Value>,
230    in_dtype: DataType,
231    reducer: R,
232}
233
234impl<R: Reducer> VecGroupedReduction<R> {
235    pub fn new(in_dtype: DataType, reducer: R) -> Self {
236        Self {
237            values: Vec::new(),
238            evicted_values: Vec::new(),
239            in_dtype,
240            reducer,
241        }
242    }
243}
244
245impl<R> GroupedReduction for VecGroupedReduction<R>
246where
247    R: Reducer,
248{
249    fn new_empty(&self) -> Box<dyn GroupedReduction> {
250        Box::new(Self {
251            values: Vec::new(),
252            evicted_values: Vec::new(),
253            in_dtype: self.in_dtype.clone(),
254            reducer: self.reducer.clone(),
255        })
256    }
257
258    fn reserve(&mut self, additional: usize) {
259        self.values.reserve(additional);
260    }
261
262    fn resize(&mut self, num_groups: IdxSize) {
263        self.values.resize(num_groups as usize, self.reducer.init());
264    }
265
266    fn update_group(
267        &mut self,
268        values: &[&Column],
269        group_idx: IdxSize,
270        seq_id: u64,
271    ) -> PolarsResult<()> {
272        assert!(values.len() == 1);
273        let values = values[0];
274        assert!(values.dtype() == &self.in_dtype);
275        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
276        let values = values.as_materialized_series(); // @scalar-opt
277        let values = self.reducer.cast_series(values);
278        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
279        self.reducer
280            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
281        Ok(())
282    }
283
284    unsafe fn update_groups_while_evicting(
285        &mut self,
286        values: &[&Column],
287        subset: &[IdxSize],
288        group_idxs: &[EvictIdx],
289        seq_id: u64,
290    ) -> PolarsResult<()> {
291        let &[values] = values else { unreachable!() };
292        assert!(values.dtype() == &self.in_dtype);
293        assert!(subset.len() == group_idxs.len());
294        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
295        let values = values.as_materialized_series(); // @scalar-opt
296        let values = self.reducer.cast_series(values);
297        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
298        let arr = ca.downcast_as_array();
299        unsafe {
300            // SAFETY: indices are in-bounds guaranteed by trait.
301            if values.has_nulls() {
302                for (i, g) in subset.iter().zip(group_idxs) {
303                    let ov = arr.get_unchecked(*i as usize);
304                    let grp = self.values.get_unchecked_mut(g.idx());
305                    if g.should_evict() {
306                        let old = core::mem::replace(grp, self.reducer.init());
307                        self.evicted_values.push(old);
308                    }
309                    self.reducer.reduce_one(grp, ov, seq_id);
310                }
311            } else {
312                for (i, g) in subset.iter().zip(group_idxs) {
313                    let v = arr.value_unchecked(*i as usize);
314                    let grp = self.values.get_unchecked_mut(g.idx());
315                    if g.should_evict() {
316                        let old = core::mem::replace(grp, self.reducer.init());
317                        self.evicted_values.push(old);
318                    }
319                    self.reducer.reduce_one(grp, Some(v), seq_id);
320                }
321            }
322        }
323        Ok(())
324    }
325
326    unsafe fn combine_subset(
327        &mut self,
328        other: &dyn GroupedReduction,
329        subset: &[IdxSize],
330        group_idxs: &[IdxSize],
331    ) -> PolarsResult<()> {
332        let other = other.as_any().downcast_ref::<Self>().unwrap();
333        assert!(self.in_dtype == other.in_dtype);
334        assert!(subset.len() == group_idxs.len());
335        unsafe {
336            // SAFETY: indices are in-bounds guaranteed by trait.
337            for (i, g) in subset.iter().zip(group_idxs) {
338                let v = other.values.get_unchecked(*i as usize);
339                let grp = self.values.get_unchecked_mut(*g as usize);
340                self.reducer.combine(grp, v);
341            }
342        }
343        Ok(())
344    }
345
346    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
347        Box::new(Self {
348            values: core::mem::take(&mut self.evicted_values),
349            evicted_values: Vec::new(),
350            in_dtype: self.in_dtype.clone(),
351            reducer: self.reducer.clone(),
352        })
353    }
354
355    fn finalize(&mut self) -> PolarsResult<Series> {
356        let v = core::mem::take(&mut self.values);
357        self.reducer.finish(v, None, &self.in_dtype)
358    }
359
360    fn as_any(&self) -> &dyn Any {
361        self
362    }
363}
364
365pub struct VecMaskGroupedReduction<R: Reducer> {
366    values: Vec<R::Value>,
367    mask: MutableBitmap,
368    evicted_values: Vec<R::Value>,
369    evicted_mask: BitmapBuilder,
370    in_dtype: DataType,
371    reducer: R,
372}
373
374impl<R: Reducer> VecMaskGroupedReduction<R> {
375    fn new(in_dtype: DataType, reducer: R) -> Self {
376        Self {
377            values: Vec::new(),
378            mask: MutableBitmap::new(),
379            evicted_values: Vec::new(),
380            evicted_mask: BitmapBuilder::new(),
381            in_dtype,
382            reducer,
383        }
384    }
385}
386
387impl<R> GroupedReduction for VecMaskGroupedReduction<R>
388where
389    R: Reducer,
390{
391    fn new_empty(&self) -> Box<dyn GroupedReduction> {
392        Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
393    }
394
395    fn reserve(&mut self, additional: usize) {
396        self.values.reserve(additional);
397        self.mask.reserve(additional)
398    }
399
400    fn resize(&mut self, num_groups: IdxSize) {
401        self.values.resize(num_groups as usize, self.reducer.init());
402        self.mask.resize(num_groups as usize, false);
403    }
404
405    fn update_group(
406        &mut self,
407        values: &[&Column],
408        group_idx: IdxSize,
409        seq_id: u64,
410    ) -> PolarsResult<()> {
411        let &[values] = values else { unreachable!() };
412        assert!(values.dtype() == &self.in_dtype);
413        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
414        let values = values.as_materialized_series(); // @scalar-opt
415        let values = self.reducer.cast_series(values);
416        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
417        self.reducer
418            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
419        if ca.len() != ca.null_count() {
420            self.mask.set(group_idx as usize, true);
421        }
422        Ok(())
423    }
424
425    unsafe fn update_groups_while_evicting(
426        &mut self,
427        values: &[&Column],
428        subset: &[IdxSize],
429        group_idxs: &[EvictIdx],
430        seq_id: u64,
431    ) -> PolarsResult<()> {
432        let &[values] = values else { unreachable!() };
433        assert!(values.dtype() == &self.in_dtype);
434        assert!(subset.len() == group_idxs.len());
435        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
436        let values = values.as_materialized_series(); // @scalar-opt
437        let values = self.reducer.cast_series(values);
438        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
439        let arr = ca.downcast_as_array();
440        unsafe {
441            // SAFETY: indices are in-bounds guaranteed by trait.
442            for (i, g) in subset.iter().zip(group_idxs) {
443                let ov = arr.get_unchecked(*i as usize);
444                let grp = self.values.get_unchecked_mut(g.idx());
445                if g.should_evict() {
446                    self.evicted_values
447                        .push(core::mem::replace(grp, self.reducer.init()));
448                    self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
449                    self.mask.set_unchecked(g.idx(), false);
450                }
451                if let Some(v) = ov {
452                    self.reducer.reduce_one(grp, Some(v), seq_id);
453                    self.mask.set_unchecked(g.idx(), true);
454                }
455            }
456        }
457        Ok(())
458    }
459
460    unsafe fn combine_subset(
461        &mut self,
462        other: &dyn GroupedReduction,
463        subset: &[IdxSize],
464        group_idxs: &[IdxSize],
465    ) -> PolarsResult<()> {
466        let other = other.as_any().downcast_ref::<Self>().unwrap();
467        assert!(self.in_dtype == other.in_dtype);
468        assert!(subset.len() == group_idxs.len());
469        unsafe {
470            // SAFETY: indices are in-bounds guaranteed by trait.
471            for (i, g) in subset.iter().zip(group_idxs) {
472                let o = other.mask.get_unchecked(*i as usize);
473                if o {
474                    let v = other.values.get_unchecked(*i as usize);
475                    let grp = self.values.get_unchecked_mut(*g as usize);
476                    self.reducer.combine(grp, v);
477                    self.mask.set_unchecked(*g as usize, true);
478                }
479            }
480        }
481        Ok(())
482    }
483
484    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
485        Box::new(Self {
486            values: core::mem::take(&mut self.evicted_values),
487            mask: core::mem::take(&mut self.evicted_mask).into_mut(),
488            evicted_values: Vec::new(),
489            evicted_mask: BitmapBuilder::new(),
490            in_dtype: self.in_dtype.clone(),
491            reducer: self.reducer.clone(),
492        })
493    }
494
495    fn finalize(&mut self) -> PolarsResult<Series> {
496        let v = core::mem::take(&mut self.values);
497        let m = core::mem::take(&mut self.mask);
498        self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
499    }
500
501    fn as_any(&self) -> &dyn Any {
502        self
503    }
504}
505
506#[derive(Clone)]
507pub struct NullGroupedReduction {
508    num_groups: IdxSize,
509    num_evictions: IdxSize,
510    output: Scalar,
511}
512
513impl NullGroupedReduction {
514    pub fn new(output: Scalar) -> Self {
515        Self {
516            num_groups: 0,
517            num_evictions: 0,
518            output,
519        }
520    }
521}
522
523impl GroupedReduction for NullGroupedReduction {
524    fn new_empty(&self) -> Box<dyn GroupedReduction> {
525        Box::new(Self::new(self.output.clone()))
526    }
527
528    fn reserve(&mut self, _additional: usize) {}
529
530    fn resize(&mut self, num_groups: IdxSize) {
531        self.num_groups = num_groups;
532    }
533
534    fn update_group(
535        &mut self,
536        values: &[&Column],
537        _group_idx: IdxSize,
538        _seq_id: u64,
539    ) -> PolarsResult<()> {
540        assert!(!values.is_empty());
541        assert!(values.iter().any(|v| v.dtype().is_null()));
542        Ok(())
543    }
544
545    unsafe fn update_groups_while_evicting(
546        &mut self,
547        values: &[&Column],
548        subset: &[IdxSize],
549        group_idxs: &[EvictIdx],
550        _seq_id: u64,
551    ) -> PolarsResult<()> {
552        assert!(!values.is_empty());
553        assert!(values.iter().any(|v| v.dtype().is_null()));
554        assert!(subset.len() == group_idxs.len());
555        for g in group_idxs {
556            self.num_evictions += g.should_evict() as IdxSize;
557        }
558        Ok(())
559    }
560
561    unsafe fn combine_subset(
562        &mut self,
563        other: &dyn GroupedReduction,
564        subset: &[IdxSize],
565        group_idxs: &[IdxSize],
566    ) -> PolarsResult<()> {
567        let _other = other.as_any().downcast_ref::<Self>().unwrap();
568        assert!(subset.len() == group_idxs.len());
569        Ok(())
570    }
571
572    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
573        Box::new(Self {
574            num_groups: core::mem::replace(&mut self.num_evictions, 0),
575            num_evictions: 0,
576            output: self.output.clone(),
577        })
578    }
579
580    fn finalize(&mut self) -> PolarsResult<Series> {
581        let length = core::mem::replace(&mut self.num_groups, 0) as usize;
582        let s = self.output.clone().into_series(PlSmallStr::EMPTY);
583        Ok(s.new_from_index(0, length))
584    }
585
586    fn as_any(&self) -> &dyn Any {
587        self
588    }
589}