Skip to main content

polars_expr/reduce/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2mod any_all;
3#[cfg(feature = "approx_unique")]
4mod approx_n_unique;
5#[cfg(feature = "bitwise")]
6mod bitwise;
7mod convert;
8mod count;
9mod first_last;
10mod first_last_nonnull;
11mod len;
12mod mean;
13mod min_max;
14mod min_max_by;
15mod sum;
16mod var_std;
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::marker::PhantomData;
21
22use arrow::array::{Array, PrimitiveArray, StaticArray};
23use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
24pub use convert::into_reduction;
25pub use min_max::{new_max_reduction, new_min_reduction};
26use polars_core::prelude::*;
27
28use crate::EvictIdx;
29
30/// A reduction with groups.
31///
32/// Each group has its own reduction state that values can be aggregated into.
33pub trait GroupedReduction: Any + Send + Sync {
34    /// Returns a new empty reduction.
35    fn new_empty(&self) -> Box<dyn GroupedReduction>;
36
37    /// Reserves space in this GroupedReduction for an additional number of groups.
38    fn reserve(&mut self, additional: usize);
39
40    /// Resizes this GroupedReduction to the given number of groups.
41    ///
42    /// While not an actual member of the trait, the safety preconditions below
43    /// refer to self.num_groups() as given by the last call of this function.
44    fn resize(&mut self, num_groups: IdxSize);
45
46    /// Updates the specified group with the given values.
47    ///
48    /// For order-sensitive grouped reductions, seq_id can be used to resolve
49    /// order between calls/multiple reductions.
50    fn update_group(
51        &mut self,
52        values: &[&Column],
53        group_idx: IdxSize,
54        seq_id: u64,
55    ) -> PolarsResult<()>;
56
57    /// Updates this GroupedReduction with new values. values[subset[i]] should
58    /// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
59    /// reductions, seq_id can be used to resolve order between calls/multiple
60    /// reductions.
61    ///
62    /// The column MUST consist of single chunk.
63    ///
64    /// # Safety
65    /// The subset and group_idxs are in-bounds.
66    unsafe fn update_groups_subset(
67        &mut self,
68        values: &[&Column],
69        subset: &[IdxSize],
70        group_idxs: &[IdxSize],
71        seq_id: u64,
72    ) -> PolarsResult<()> {
73        assert!(values.len() < (1 << (IdxSize::BITS - 1)));
74        // SAFETY: EvictIdx is a wrapper for IdxSize and has same alignment.
75        let evict_group_idxs = EvictIdx::cast_slice(group_idxs);
76        self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
77    }
78
79    /// Updates this GroupedReduction with new values. values[subset[i]] should
80    /// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
81    /// reductions, seq_id can be used to resolve order between calls/multiple
82    /// reductions. If the group_idxs[i] has its evict bit set the current value
83    /// in the group should be evicted and reset before updating.
84    ///
85    /// The column MUST consist of single chunk.
86    ///
87    /// # Safety
88    /// The subset and group_idxs are in-bounds.
89    unsafe fn update_groups_while_evicting(
90        &mut self,
91        values: &[&Column],
92        subset: &[IdxSize],
93        group_idxs: &[EvictIdx],
94        seq_id: u64,
95    ) -> PolarsResult<()>;
96
97    /// Combines this GroupedReduction with another. Group other[subset[i]]
98    /// should be combined into group self[group_idxs[i]].
99    ///
100    /// # Safety
101    /// subset[i] < other.num_groups() for all i.
102    /// group_idxs[i] < self.num_groups() for all i.
103    unsafe fn combine_subset(
104        &mut self,
105        other: &dyn GroupedReduction,
106        subset: &[IdxSize],
107        group_idxs: &[IdxSize],
108    ) -> PolarsResult<()>;
109
110    /// Take the accumulated evicted groups.
111    fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
112
113    /// Returns the finalized value per group as a Series.
114    ///
115    /// After this operation the number of groups is reset to 0.
116    fn finalize(&mut self) -> PolarsResult<Series>;
117
118    /// Returns this GroupedReduction as a dyn Any.
119    fn as_any(&self) -> &dyn Any;
120}
121
122// Helper traits used in the VecGroupedReduction and VecMaskGroupedReduction to
123// reduce code duplication.
124pub trait Reducer: Send + Sync + Clone + 'static {
125    type Dtype: PolarsPhysicalType;
126    type Value: Clone + Send + Sync + 'static;
127    fn init(&self) -> Self::Value;
128    #[inline(always)]
129    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
130        Cow::Borrowed(s)
131    }
132    fn combine(&self, a: &mut Self::Value, b: &Self::Value);
133    fn reduce_one(
134        &self,
135        a: &mut Self::Value,
136        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
137        seq_id: u64,
138    );
139    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
140    fn finish(
141        &self,
142        v: Vec<Self::Value>,
143        m: Option<Bitmap>,
144        dtype: &DataType,
145    ) -> PolarsResult<Series>;
146}
147
148pub trait NumericReduction: Send + Sync + 'static {
149    type Dtype: PolarsNumericType;
150    fn init() -> <Self::Dtype as PolarsNumericType>::Native;
151    fn combine(
152        a: <Self::Dtype as PolarsNumericType>::Native,
153        b: <Self::Dtype as PolarsNumericType>::Native,
154    ) -> <Self::Dtype as PolarsNumericType>::Native;
155    fn reduce_ca(
156        ca: &ChunkedArray<Self::Dtype>,
157    ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
158}
159
160struct NumReducer<R: NumericReduction>(PhantomData<R>);
161impl<R: NumericReduction> NumReducer<R> {
162    fn new() -> Self {
163        Self(PhantomData)
164    }
165}
166impl<R: NumericReduction> Clone for NumReducer<R> {
167    fn clone(&self) -> Self {
168        Self(PhantomData)
169    }
170}
171
172impl<R: NumericReduction> Reducer for NumReducer<R> {
173    type Dtype = <R as NumericReduction>::Dtype;
174    type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
175
176    #[inline(always)]
177    fn init(&self) -> Self::Value {
178        <R as NumericReduction>::init()
179    }
180
181    #[inline(always)]
182    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
183        s.to_physical_repr()
184    }
185
186    #[inline(always)]
187    fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
188        *a = <R as NumericReduction>::combine(*a, *b);
189    }
190
191    #[inline(always)]
192    fn reduce_one(
193        &self,
194        a: &mut Self::Value,
195        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
196        _seq_id: u64,
197    ) {
198        if let Some(b) = b {
199            *a = <R as NumericReduction>::combine(*a, b);
200        }
201    }
202
203    #[inline(always)]
204    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
205        if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
206            *v = <R as NumericReduction>::combine(*v, r);
207        }
208    }
209
210    fn finish(
211        &self,
212        v: Vec<Self::Value>,
213        m: Option<Bitmap>,
214        dtype: &DataType,
215    ) -> PolarsResult<Series> {
216        let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
217        Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
218    }
219}
220
221pub struct VecGroupedReduction<R: Reducer> {
222    values: Vec<R::Value>,
223    evicted_values: Vec<R::Value>,
224    in_dtype: DataType,
225    reducer: R,
226}
227
228impl<R: Reducer> VecGroupedReduction<R> {
229    pub fn new(in_dtype: DataType, reducer: R) -> Self {
230        Self {
231            values: Vec::new(),
232            evicted_values: Vec::new(),
233            in_dtype,
234            reducer,
235        }
236    }
237}
238
239impl<R> GroupedReduction for VecGroupedReduction<R>
240where
241    R: Reducer,
242{
243    fn new_empty(&self) -> Box<dyn GroupedReduction> {
244        Box::new(Self {
245            values: Vec::new(),
246            evicted_values: Vec::new(),
247            in_dtype: self.in_dtype.clone(),
248            reducer: self.reducer.clone(),
249        })
250    }
251
252    fn reserve(&mut self, additional: usize) {
253        self.values.reserve(additional);
254    }
255
256    fn resize(&mut self, num_groups: IdxSize) {
257        self.values.resize(num_groups as usize, self.reducer.init());
258    }
259
260    fn update_group(
261        &mut self,
262        values: &[&Column],
263        group_idx: IdxSize,
264        seq_id: u64,
265    ) -> PolarsResult<()> {
266        assert!(values.len() == 1);
267        let values = values[0];
268        assert!(values.dtype() == &self.in_dtype);
269        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
270        let values = values.as_materialized_series(); // @scalar-opt
271        let values = self.reducer.cast_series(values);
272        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
273        self.reducer
274            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
275        Ok(())
276    }
277
278    unsafe fn update_groups_while_evicting(
279        &mut self,
280        values: &[&Column],
281        subset: &[IdxSize],
282        group_idxs: &[EvictIdx],
283        seq_id: u64,
284    ) -> PolarsResult<()> {
285        let &[values] = values else { unreachable!() };
286        assert!(values.dtype() == &self.in_dtype);
287        assert!(subset.len() == group_idxs.len());
288        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
289        let values = values.as_materialized_series(); // @scalar-opt
290        let values = self.reducer.cast_series(values);
291        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
292        let arr = ca.downcast_as_array();
293        unsafe {
294            // SAFETY: indices are in-bounds guaranteed by trait.
295            if values.has_nulls() {
296                for (i, g) in subset.iter().zip(group_idxs) {
297                    let ov = arr.get_unchecked(*i as usize);
298                    let grp = self.values.get_unchecked_mut(g.idx());
299                    if g.should_evict() {
300                        let old = core::mem::replace(grp, self.reducer.init());
301                        self.evicted_values.push(old);
302                    }
303                    self.reducer.reduce_one(grp, ov, seq_id);
304                }
305            } else {
306                for (i, g) in subset.iter().zip(group_idxs) {
307                    let v = arr.value_unchecked(*i as usize);
308                    let grp = self.values.get_unchecked_mut(g.idx());
309                    if g.should_evict() {
310                        let old = core::mem::replace(grp, self.reducer.init());
311                        self.evicted_values.push(old);
312                    }
313                    self.reducer.reduce_one(grp, Some(v), seq_id);
314                }
315            }
316        }
317        Ok(())
318    }
319
320    unsafe fn combine_subset(
321        &mut self,
322        other: &dyn GroupedReduction,
323        subset: &[IdxSize],
324        group_idxs: &[IdxSize],
325    ) -> PolarsResult<()> {
326        let other = other.as_any().downcast_ref::<Self>().unwrap();
327        assert!(self.in_dtype == other.in_dtype);
328        assert!(subset.len() == group_idxs.len());
329        unsafe {
330            // SAFETY: indices are in-bounds guaranteed by trait.
331            for (i, g) in subset.iter().zip(group_idxs) {
332                let v = other.values.get_unchecked(*i as usize);
333                let grp = self.values.get_unchecked_mut(*g as usize);
334                self.reducer.combine(grp, v);
335            }
336        }
337        Ok(())
338    }
339
340    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
341        Box::new(Self {
342            values: core::mem::take(&mut self.evicted_values),
343            evicted_values: Vec::new(),
344            in_dtype: self.in_dtype.clone(),
345            reducer: self.reducer.clone(),
346        })
347    }
348
349    fn finalize(&mut self) -> PolarsResult<Series> {
350        let v = core::mem::take(&mut self.values);
351        self.reducer.finish(v, None, &self.in_dtype)
352    }
353
354    fn as_any(&self) -> &dyn Any {
355        self
356    }
357}
358
359pub struct VecMaskGroupedReduction<R: Reducer> {
360    values: Vec<R::Value>,
361    mask: MutableBitmap,
362    evicted_values: Vec<R::Value>,
363    evicted_mask: BitmapBuilder,
364    in_dtype: DataType,
365    reducer: R,
366}
367
368impl<R: Reducer> VecMaskGroupedReduction<R> {
369    fn new(in_dtype: DataType, reducer: R) -> Self {
370        Self {
371            values: Vec::new(),
372            mask: MutableBitmap::new(),
373            evicted_values: Vec::new(),
374            evicted_mask: BitmapBuilder::new(),
375            in_dtype,
376            reducer,
377        }
378    }
379}
380
381impl<R> GroupedReduction for VecMaskGroupedReduction<R>
382where
383    R: Reducer,
384{
385    fn new_empty(&self) -> Box<dyn GroupedReduction> {
386        Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
387    }
388
389    fn reserve(&mut self, additional: usize) {
390        self.values.reserve(additional);
391        self.mask.reserve(additional)
392    }
393
394    fn resize(&mut self, num_groups: IdxSize) {
395        self.values.resize(num_groups as usize, self.reducer.init());
396        self.mask.resize(num_groups as usize, false);
397    }
398
399    fn update_group(
400        &mut self,
401        values: &[&Column],
402        group_idx: IdxSize,
403        seq_id: u64,
404    ) -> PolarsResult<()> {
405        let &[values] = values else { unreachable!() };
406        assert!(values.dtype() == &self.in_dtype);
407        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
408        let values = values.as_materialized_series(); // @scalar-opt
409        let values = self.reducer.cast_series(values);
410        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
411        self.reducer
412            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
413        if ca.len() != ca.null_count() {
414            self.mask.set(group_idx as usize, true);
415        }
416        Ok(())
417    }
418
419    unsafe fn update_groups_while_evicting(
420        &mut self,
421        values: &[&Column],
422        subset: &[IdxSize],
423        group_idxs: &[EvictIdx],
424        seq_id: u64,
425    ) -> PolarsResult<()> {
426        let &[values] = values else { unreachable!() };
427        assert!(values.dtype() == &self.in_dtype);
428        assert!(subset.len() == group_idxs.len());
429        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
430        let values = values.as_materialized_series(); // @scalar-opt
431        let values = self.reducer.cast_series(values);
432        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
433        let arr = ca.downcast_as_array();
434        unsafe {
435            // SAFETY: indices are in-bounds guaranteed by trait.
436            for (i, g) in subset.iter().zip(group_idxs) {
437                let ov = arr.get_unchecked(*i as usize);
438                let grp = self.values.get_unchecked_mut(g.idx());
439                if g.should_evict() {
440                    self.evicted_values
441                        .push(core::mem::replace(grp, self.reducer.init()));
442                    self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
443                    self.mask.set_unchecked(g.idx(), false);
444                }
445                if let Some(v) = ov {
446                    self.reducer.reduce_one(grp, Some(v), seq_id);
447                    self.mask.set_unchecked(g.idx(), true);
448                }
449            }
450        }
451        Ok(())
452    }
453
454    unsafe fn combine_subset(
455        &mut self,
456        other: &dyn GroupedReduction,
457        subset: &[IdxSize],
458        group_idxs: &[IdxSize],
459    ) -> PolarsResult<()> {
460        let other = other.as_any().downcast_ref::<Self>().unwrap();
461        assert!(self.in_dtype == other.in_dtype);
462        assert!(subset.len() == group_idxs.len());
463        unsafe {
464            // SAFETY: indices are in-bounds guaranteed by trait.
465            for (i, g) in subset.iter().zip(group_idxs) {
466                let o = other.mask.get_unchecked(*i as usize);
467                if o {
468                    let v = other.values.get_unchecked(*i as usize);
469                    let grp = self.values.get_unchecked_mut(*g as usize);
470                    self.reducer.combine(grp, v);
471                    self.mask.set_unchecked(*g as usize, true);
472                }
473            }
474        }
475        Ok(())
476    }
477
478    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
479        Box::new(Self {
480            values: core::mem::take(&mut self.evicted_values),
481            mask: core::mem::take(&mut self.evicted_mask).into_mut(),
482            evicted_values: Vec::new(),
483            evicted_mask: BitmapBuilder::new(),
484            in_dtype: self.in_dtype.clone(),
485            reducer: self.reducer.clone(),
486        })
487    }
488
489    fn finalize(&mut self) -> PolarsResult<Series> {
490        let v = core::mem::take(&mut self.values);
491        let m = core::mem::take(&mut self.mask);
492        self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
493    }
494
495    fn as_any(&self) -> &dyn Any {
496        self
497    }
498}
499
500#[derive(Clone)]
501pub struct NullGroupedReduction {
502    num_groups: IdxSize,
503    num_evictions: IdxSize,
504    output: Scalar,
505}
506
507impl NullGroupedReduction {
508    pub fn new(output: Scalar) -> Self {
509        Self {
510            num_groups: 0,
511            num_evictions: 0,
512            output,
513        }
514    }
515}
516
517impl GroupedReduction for NullGroupedReduction {
518    fn new_empty(&self) -> Box<dyn GroupedReduction> {
519        Box::new(Self::new(self.output.clone()))
520    }
521
522    fn reserve(&mut self, _additional: usize) {}
523
524    fn resize(&mut self, num_groups: IdxSize) {
525        self.num_groups = num_groups;
526    }
527
528    fn update_group(
529        &mut self,
530        values: &[&Column],
531        _group_idx: IdxSize,
532        _seq_id: u64,
533    ) -> PolarsResult<()> {
534        let &[values] = values else { unreachable!() };
535        assert!(values.dtype().is_null());
536        Ok(())
537    }
538
539    unsafe fn update_groups_while_evicting(
540        &mut self,
541        values: &[&Column],
542        subset: &[IdxSize],
543        group_idxs: &[EvictIdx],
544        _seq_id: u64,
545    ) -> PolarsResult<()> {
546        let &[values] = values else { unreachable!() };
547        assert!(values.dtype().is_null());
548        assert!(subset.len() == group_idxs.len());
549        for g in group_idxs {
550            self.num_evictions += g.should_evict() as IdxSize;
551        }
552        Ok(())
553    }
554
555    unsafe fn combine_subset(
556        &mut self,
557        other: &dyn GroupedReduction,
558        subset: &[IdxSize],
559        group_idxs: &[IdxSize],
560    ) -> PolarsResult<()> {
561        let _other = other.as_any().downcast_ref::<Self>().unwrap();
562        assert!(subset.len() == group_idxs.len());
563        Ok(())
564    }
565
566    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
567        Box::new(Self {
568            num_groups: core::mem::replace(&mut self.num_evictions, 0),
569            num_evictions: 0,
570            output: self.output.clone(),
571        })
572    }
573
574    fn finalize(&mut self) -> PolarsResult<Series> {
575        let length = core::mem::replace(&mut self.num_groups, 0) as usize;
576        let s = self.output.clone().into_series(PlSmallStr::EMPTY);
577        Ok(s.new_from_index(0, length))
578    }
579
580    fn as_any(&self) -> &dyn Any {
581        self
582    }
583}