polars_expr/reduce/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2mod any_all;
3#[cfg(feature = "approx_unique")]
4mod approx_n_unique;
5#[cfg(feature = "bitwise")]
6mod bitwise;
7mod convert;
8mod count;
9mod first_last;
10mod len;
11mod mean;
12mod min_max;
13mod sum;
14mod var_std;
15
16use std::any::Any;
17use std::borrow::Cow;
18use std::marker::PhantomData;
19
20use arrow::array::{Array, PrimitiveArray, StaticArray};
21use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
22pub use convert::into_reduction;
23pub use min_max::{new_max_reduction, new_min_reduction};
24use polars_core::prelude::*;
25
26use crate::EvictIdx;
27
28/// A reduction with groups.
29///
30/// Each group has its own reduction state that values can be aggregated into.
31pub trait GroupedReduction: Any + Send + Sync {
32    /// Returns a new empty reduction.
33    fn new_empty(&self) -> Box<dyn GroupedReduction>;
34
35    /// Reserves space in this GroupedReduction for an additional number of groups.
36    fn reserve(&mut self, additional: usize);
37
38    /// Resizes this GroupedReduction to the given number of groups.
39    ///
40    /// While not an actual member of the trait, the safety preconditions below
41    /// refer to self.num_groups() as given by the last call of this function.
42    fn resize(&mut self, num_groups: IdxSize);
43
44    /// Updates the specified group with the given values.
45    ///
46    /// For order-sensitive grouped reductions, seq_id can be used to resolve
47    /// order between calls/multiple reductions.
48    fn update_group(
49        &mut self,
50        values: &Column,
51        group_idx: IdxSize,
52        seq_id: u64,
53    ) -> PolarsResult<()>;
54
55    /// Updates this GroupedReduction with new values. values[subset[i]] should
56    /// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
57    /// reductions, seq_id can be used to resolve order between calls/multiple
58    /// reductions.
59    ///
60    /// # Safety
61    /// The subset and group_idxs are in-bounds.
62    unsafe fn update_groups_subset(
63        &mut self,
64        values: &Column,
65        subset: &[IdxSize],
66        group_idxs: &[IdxSize],
67        seq_id: u64,
68    ) -> PolarsResult<()> {
69        assert!(values.len() < (1 << (IdxSize::BITS - 1)));
70        let evict_group_idxs = core::mem::transmute::<&[IdxSize], &[EvictIdx]>(group_idxs);
71        self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
72    }
73
74    /// Updates this GroupedReduction with new values. values[subset[i]] should
75    /// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
76    /// reductions, seq_id can be used to resolve order between calls/multiple
77    /// reductions. If the group_idxs[i] has its evict bit set the current value
78    /// in the group should be evicted and reset before updating.
79    ///
80    /// # Safety
81    /// The subset and group_idxs are in-bounds.
82    unsafe fn update_groups_while_evicting(
83        &mut self,
84        values: &Column,
85        subset: &[IdxSize],
86        group_idxs: &[EvictIdx],
87        seq_id: u64,
88    ) -> PolarsResult<()>;
89
90    /// Combines this GroupedReduction with another. Group other[subset[i]]
91    /// should be combined into group self[group_idxs[i]].
92    ///
93    /// # Safety
94    /// subset[i] < other.num_groups() for all i.
95    /// group_idxs[i] < self.num_groups() for all i.
96    unsafe fn combine_subset(
97        &mut self,
98        other: &dyn GroupedReduction,
99        subset: &[IdxSize],
100        group_idxs: &[IdxSize],
101    ) -> PolarsResult<()>;
102
103    /// Take the accumulated evicted groups.
104    fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
105
106    /// Returns the finalized value per group as a Series.
107    ///
108    /// After this operation the number of groups is reset to 0.
109    fn finalize(&mut self) -> PolarsResult<Series>;
110
111    /// Returns this GroupedReduction as a dyn Any.
112    fn as_any(&self) -> &dyn Any;
113}
114
115// Helper traits used in the VecGroupedReduction and VecMaskGroupedReduction to
116// reduce code duplication.
117pub trait Reducer: Send + Sync + Clone + 'static {
118    type Dtype: PolarsPhysicalType;
119    type Value: Clone + Send + Sync + 'static;
120    fn init(&self) -> Self::Value;
121    #[inline(always)]
122    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
123        Cow::Borrowed(s)
124    }
125    fn combine(&self, a: &mut Self::Value, b: &Self::Value);
126    fn reduce_one(
127        &self,
128        a: &mut Self::Value,
129        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
130        seq_id: u64,
131    );
132    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
133    fn finish(
134        &self,
135        v: Vec<Self::Value>,
136        m: Option<Bitmap>,
137        dtype: &DataType,
138    ) -> PolarsResult<Series>;
139}
140
141pub trait NumericReduction: Send + Sync + 'static {
142    type Dtype: PolarsNumericType;
143    fn init() -> <Self::Dtype as PolarsNumericType>::Native;
144    fn combine(
145        a: <Self::Dtype as PolarsNumericType>::Native,
146        b: <Self::Dtype as PolarsNumericType>::Native,
147    ) -> <Self::Dtype as PolarsNumericType>::Native;
148    fn reduce_ca(
149        ca: &ChunkedArray<Self::Dtype>,
150    ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
151}
152
153struct NumReducer<R: NumericReduction>(PhantomData<R>);
154impl<R: NumericReduction> NumReducer<R> {
155    fn new() -> Self {
156        Self(PhantomData)
157    }
158}
159impl<R: NumericReduction> Clone for NumReducer<R> {
160    fn clone(&self) -> Self {
161        Self(PhantomData)
162    }
163}
164
165impl<R: NumericReduction> Reducer for NumReducer<R> {
166    type Dtype = <R as NumericReduction>::Dtype;
167    type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
168
169    #[inline(always)]
170    fn init(&self) -> Self::Value {
171        <R as NumericReduction>::init()
172    }
173
174    #[inline(always)]
175    fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
176        s.to_physical_repr()
177    }
178
179    #[inline(always)]
180    fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
181        *a = <R as NumericReduction>::combine(*a, *b);
182    }
183
184    #[inline(always)]
185    fn reduce_one(
186        &self,
187        a: &mut Self::Value,
188        b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
189        _seq_id: u64,
190    ) {
191        if let Some(b) = b {
192            *a = <R as NumericReduction>::combine(*a, b);
193        }
194    }
195
196    #[inline(always)]
197    fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
198        if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
199            *v = <R as NumericReduction>::combine(*v, r);
200        }
201    }
202
203    fn finish(
204        &self,
205        v: Vec<Self::Value>,
206        m: Option<Bitmap>,
207        dtype: &DataType,
208    ) -> PolarsResult<Series> {
209        let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
210        Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
211    }
212}
213
214pub struct VecGroupedReduction<R: Reducer> {
215    values: Vec<R::Value>,
216    evicted_values: Vec<R::Value>,
217    in_dtype: DataType,
218    reducer: R,
219}
220
221impl<R: Reducer> VecGroupedReduction<R> {
222    pub fn new(in_dtype: DataType, reducer: R) -> Self {
223        Self {
224            values: Vec::new(),
225            evicted_values: Vec::new(),
226            in_dtype,
227            reducer,
228        }
229    }
230}
231
232impl<R> GroupedReduction for VecGroupedReduction<R>
233where
234    R: Reducer,
235{
236    fn new_empty(&self) -> Box<dyn GroupedReduction> {
237        Box::new(Self {
238            values: Vec::new(),
239            evicted_values: Vec::new(),
240            in_dtype: self.in_dtype.clone(),
241            reducer: self.reducer.clone(),
242        })
243    }
244
245    fn reserve(&mut self, additional: usize) {
246        self.values.reserve(additional);
247    }
248
249    fn resize(&mut self, num_groups: IdxSize) {
250        self.values.resize(num_groups as usize, self.reducer.init());
251    }
252
253    fn update_group(
254        &mut self,
255        values: &Column,
256        group_idx: IdxSize,
257        seq_id: u64,
258    ) -> PolarsResult<()> {
259        assert!(values.dtype() == &self.in_dtype);
260        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
261        let values = values.as_materialized_series(); // @scalar-opt
262        let values = self.reducer.cast_series(values);
263        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
264        self.reducer
265            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
266        Ok(())
267    }
268
269    unsafe fn update_groups_while_evicting(
270        &mut self,
271        values: &Column,
272        subset: &[IdxSize],
273        group_idxs: &[EvictIdx],
274        seq_id: u64,
275    ) -> PolarsResult<()> {
276        assert!(values.dtype() == &self.in_dtype);
277        assert!(subset.len() == group_idxs.len());
278        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
279        let values = values.as_materialized_series(); // @scalar-opt
280        let values = self.reducer.cast_series(values);
281        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
282        let arr = ca.downcast_as_array();
283        unsafe {
284            // SAFETY: indices are in-bounds guaranteed by trait.
285            if values.has_nulls() {
286                for (i, g) in subset.iter().zip(group_idxs) {
287                    let ov = arr.get_unchecked(*i as usize);
288                    let grp = self.values.get_unchecked_mut(g.idx());
289                    if g.should_evict() {
290                        let old = core::mem::replace(grp, self.reducer.init());
291                        self.evicted_values.push(old);
292                    }
293                    self.reducer.reduce_one(grp, ov, seq_id);
294                }
295            } else {
296                for (i, g) in subset.iter().zip(group_idxs) {
297                    let v = arr.value_unchecked(*i as usize);
298                    let grp = self.values.get_unchecked_mut(g.idx());
299                    if g.should_evict() {
300                        let old = core::mem::replace(grp, self.reducer.init());
301                        self.evicted_values.push(old);
302                    }
303                    self.reducer.reduce_one(grp, Some(v), seq_id);
304                }
305            }
306        }
307        Ok(())
308    }
309
310    unsafe fn combine_subset(
311        &mut self,
312        other: &dyn GroupedReduction,
313        subset: &[IdxSize],
314        group_idxs: &[IdxSize],
315    ) -> PolarsResult<()> {
316        let other = other.as_any().downcast_ref::<Self>().unwrap();
317        assert!(self.in_dtype == other.in_dtype);
318        assert!(subset.len() == group_idxs.len());
319        unsafe {
320            // SAFETY: indices are in-bounds guaranteed by trait.
321            for (i, g) in subset.iter().zip(group_idxs) {
322                let v = other.values.get_unchecked(*i as usize);
323                let grp = self.values.get_unchecked_mut(*g as usize);
324                self.reducer.combine(grp, v);
325            }
326        }
327        Ok(())
328    }
329
330    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
331        Box::new(Self {
332            values: core::mem::take(&mut self.evicted_values),
333            evicted_values: Vec::new(),
334            in_dtype: self.in_dtype.clone(),
335            reducer: self.reducer.clone(),
336        })
337    }
338
339    fn finalize(&mut self) -> PolarsResult<Series> {
340        let v = core::mem::take(&mut self.values);
341        self.reducer.finish(v, None, &self.in_dtype)
342    }
343
344    fn as_any(&self) -> &dyn Any {
345        self
346    }
347}
348
349pub struct VecMaskGroupedReduction<R: Reducer> {
350    values: Vec<R::Value>,
351    mask: MutableBitmap,
352    evicted_values: Vec<R::Value>,
353    evicted_mask: BitmapBuilder,
354    in_dtype: DataType,
355    reducer: R,
356}
357
358impl<R: Reducer> VecMaskGroupedReduction<R> {
359    fn new(in_dtype: DataType, reducer: R) -> Self {
360        Self {
361            values: Vec::new(),
362            mask: MutableBitmap::new(),
363            evicted_values: Vec::new(),
364            evicted_mask: BitmapBuilder::new(),
365            in_dtype,
366            reducer,
367        }
368    }
369}
370
371impl<R> GroupedReduction for VecMaskGroupedReduction<R>
372where
373    R: Reducer,
374{
375    fn new_empty(&self) -> Box<dyn GroupedReduction> {
376        Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
377    }
378
379    fn reserve(&mut self, additional: usize) {
380        self.values.reserve(additional);
381        self.mask.reserve(additional)
382    }
383
384    fn resize(&mut self, num_groups: IdxSize) {
385        self.values.resize(num_groups as usize, self.reducer.init());
386        self.mask.resize(num_groups as usize, false);
387    }
388
389    fn update_group(
390        &mut self,
391        values: &Column,
392        group_idx: IdxSize,
393        seq_id: u64,
394    ) -> PolarsResult<()> {
395        assert!(values.dtype() == &self.in_dtype);
396        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
397        let values = values.as_materialized_series(); // @scalar-opt
398        let values = self.reducer.cast_series(values);
399        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
400        self.reducer
401            .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
402        if ca.len() != ca.null_count() {
403            self.mask.set(group_idx as usize, true);
404        }
405        Ok(())
406    }
407
408    unsafe fn update_groups_while_evicting(
409        &mut self,
410        values: &Column,
411        subset: &[IdxSize],
412        group_idxs: &[EvictIdx],
413        seq_id: u64,
414    ) -> PolarsResult<()> {
415        assert!(values.dtype() == &self.in_dtype);
416        assert!(subset.len() == group_idxs.len());
417        let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
418        let values = values.as_materialized_series(); // @scalar-opt
419        let values = self.reducer.cast_series(values);
420        let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
421        let arr = ca.downcast_as_array();
422        unsafe {
423            // SAFETY: indices are in-bounds guaranteed by trait.
424            for (i, g) in subset.iter().zip(group_idxs) {
425                let ov = arr.get_unchecked(*i as usize);
426                let grp = self.values.get_unchecked_mut(g.idx());
427                if g.should_evict() {
428                    self.evicted_values
429                        .push(core::mem::replace(grp, self.reducer.init()));
430                    self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
431                    self.mask.set_unchecked(g.idx(), false);
432                }
433                if let Some(v) = ov {
434                    self.reducer.reduce_one(grp, Some(v), seq_id);
435                    self.mask.set_unchecked(g.idx(), true);
436                }
437            }
438        }
439        Ok(())
440    }
441
442    unsafe fn combine_subset(
443        &mut self,
444        other: &dyn GroupedReduction,
445        subset: &[IdxSize],
446        group_idxs: &[IdxSize],
447    ) -> PolarsResult<()> {
448        let other = other.as_any().downcast_ref::<Self>().unwrap();
449        assert!(self.in_dtype == other.in_dtype);
450        assert!(subset.len() == group_idxs.len());
451        unsafe {
452            // SAFETY: indices are in-bounds guaranteed by trait.
453            for (i, g) in subset.iter().zip(group_idxs) {
454                let o = other.mask.get_unchecked(*i as usize);
455                if o {
456                    let v = other.values.get_unchecked(*i as usize);
457                    let grp = self.values.get_unchecked_mut(*g as usize);
458                    self.reducer.combine(grp, v);
459                    self.mask.set_unchecked(*g as usize, true);
460                }
461            }
462        }
463        Ok(())
464    }
465
466    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
467        Box::new(Self {
468            values: core::mem::take(&mut self.evicted_values),
469            mask: core::mem::take(&mut self.evicted_mask).into_mut(),
470            evicted_values: Vec::new(),
471            evicted_mask: BitmapBuilder::new(),
472            in_dtype: self.in_dtype.clone(),
473            reducer: self.reducer.clone(),
474        })
475    }
476
477    fn finalize(&mut self) -> PolarsResult<Series> {
478        let v = core::mem::take(&mut self.values);
479        let m = core::mem::take(&mut self.mask);
480        self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
481    }
482
483    fn as_any(&self) -> &dyn Any {
484        self
485    }
486}
487
488#[derive(Clone)]
489pub struct NullGroupedReduction {
490    num_groups: IdxSize,
491    num_evictions: IdxSize,
492    output: Scalar,
493}
494
495impl NullGroupedReduction {
496    pub fn new(output: Scalar) -> Self {
497        Self {
498            num_groups: 0,
499            num_evictions: 0,
500            output,
501        }
502    }
503}
504
505impl GroupedReduction for NullGroupedReduction {
506    fn new_empty(&self) -> Box<dyn GroupedReduction> {
507        Box::new(Self::new(self.output.clone()))
508    }
509
510    fn reserve(&mut self, _additional: usize) {}
511
512    fn resize(&mut self, num_groups: IdxSize) {
513        self.num_groups = num_groups;
514    }
515
516    fn update_group(
517        &mut self,
518        values: &Column,
519        _group_idx: IdxSize,
520        _seq_id: u64,
521    ) -> PolarsResult<()> {
522        assert!(values.dtype().is_null());
523        Ok(())
524    }
525
526    unsafe fn update_groups_while_evicting(
527        &mut self,
528        values: &Column,
529        subset: &[IdxSize],
530        group_idxs: &[EvictIdx],
531        _seq_id: u64,
532    ) -> PolarsResult<()> {
533        assert!(values.dtype().is_null());
534        assert!(subset.len() == group_idxs.len());
535        for g in group_idxs {
536            self.num_evictions += g.should_evict() as IdxSize;
537        }
538        Ok(())
539    }
540
541    unsafe fn combine_subset(
542        &mut self,
543        other: &dyn GroupedReduction,
544        subset: &[IdxSize],
545        group_idxs: &[IdxSize],
546    ) -> PolarsResult<()> {
547        let _other = other.as_any().downcast_ref::<Self>().unwrap();
548        assert!(subset.len() == group_idxs.len());
549        Ok(())
550    }
551
552    fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
553        Box::new(Self {
554            num_groups: core::mem::replace(&mut self.num_evictions, 0),
555            num_evictions: 0,
556            output: self.output.clone(),
557        })
558    }
559
560    fn finalize(&mut self) -> PolarsResult<Series> {
561        let length = core::mem::replace(&mut self.num_groups, 0) as usize;
562        let s = self.output.clone().into_series(PlSmallStr::EMPTY);
563        Ok(s.new_from_index(0, length))
564    }
565
566    fn as_any(&self) -> &dyn Any {
567        self
568    }
569}