1#![allow(unsafe_op_in_unsafe_fn)]
2mod any_all;
3#[cfg(feature = "approx_unique")]
4mod approx_n_unique;
5#[cfg(feature = "bitwise")]
6mod bitwise;
7mod convert;
8mod count;
9mod first_last;
10mod first_last_nonnull;
11mod len;
12mod mean;
13mod min_max;
14mod min_max_by;
15mod sum;
16mod var_std;
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::marker::PhantomData;
21
22use arrow::array::{Array, PrimitiveArray, StaticArray};
23use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
24pub use convert::into_reduction;
25pub use min_max::{new_max_reduction, new_min_reduction};
26use polars_core::prelude::*;
27
28use crate::EvictIdx;
29
30pub trait GroupedReduction: Any + Send + Sync {
34 fn new_empty(&self) -> Box<dyn GroupedReduction>;
36
37 fn reserve(&mut self, additional: usize);
39
40 fn resize(&mut self, num_groups: IdxSize);
45
46 fn update_group(
51 &mut self,
52 values: &[&Column],
53 group_idx: IdxSize,
54 seq_id: u64,
55 ) -> PolarsResult<()>;
56
57 unsafe fn update_groups_subset(
67 &mut self,
68 values: &[&Column],
69 subset: &[IdxSize],
70 group_idxs: &[IdxSize],
71 seq_id: u64,
72 ) -> PolarsResult<()> {
73 assert!(values.len() < (1 << (IdxSize::BITS - 1)));
74 let evict_group_idxs = EvictIdx::cast_slice(group_idxs);
76 self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
77 }
78
79 unsafe fn update_groups_while_evicting(
90 &mut self,
91 values: &[&Column],
92 subset: &[IdxSize],
93 group_idxs: &[EvictIdx],
94 seq_id: u64,
95 ) -> PolarsResult<()>;
96
97 unsafe fn combine_subset(
104 &mut self,
105 other: &dyn GroupedReduction,
106 subset: &[IdxSize],
107 group_idxs: &[IdxSize],
108 ) -> PolarsResult<()>;
109
110 fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
112
113 fn finalize(&mut self) -> PolarsResult<Series>;
117
118 fn as_any(&self) -> &dyn Any;
120}
121
122pub trait Reducer: Send + Sync + Clone + 'static {
125 type Dtype: PolarsPhysicalType;
126 type Value: Clone + Send + Sync + 'static;
127 fn init(&self) -> Self::Value;
128 #[inline(always)]
129 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
130 Cow::Borrowed(s)
131 }
132 fn combine(&self, a: &mut Self::Value, b: &Self::Value);
133 fn reduce_one(
134 &self,
135 a: &mut Self::Value,
136 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
137 seq_id: u64,
138 );
139 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
140 fn finish(
141 &self,
142 v: Vec<Self::Value>,
143 m: Option<Bitmap>,
144 dtype: &DataType,
145 ) -> PolarsResult<Series>;
146}
147
148pub trait NumericReduction: Send + Sync + 'static {
149 type Dtype: PolarsNumericType;
150 fn init() -> <Self::Dtype as PolarsNumericType>::Native;
151 fn combine(
152 a: <Self::Dtype as PolarsNumericType>::Native,
153 b: <Self::Dtype as PolarsNumericType>::Native,
154 ) -> <Self::Dtype as PolarsNumericType>::Native;
155 fn reduce_ca(
156 ca: &ChunkedArray<Self::Dtype>,
157 ) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
158}
159
160struct NumReducer<R: NumericReduction>(PhantomData<R>);
161impl<R: NumericReduction> NumReducer<R> {
162 fn new() -> Self {
163 Self(PhantomData)
164 }
165}
166impl<R: NumericReduction> Clone for NumReducer<R> {
167 fn clone(&self) -> Self {
168 Self(PhantomData)
169 }
170}
171
172impl<R: NumericReduction> Reducer for NumReducer<R> {
173 type Dtype = <R as NumericReduction>::Dtype;
174 type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
175
176 #[inline(always)]
177 fn init(&self) -> Self::Value {
178 <R as NumericReduction>::init()
179 }
180
181 #[inline(always)]
182 fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
183 s.to_physical_repr()
184 }
185
186 #[inline(always)]
187 fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
188 *a = <R as NumericReduction>::combine(*a, *b);
189 }
190
191 #[inline(always)]
192 fn reduce_one(
193 &self,
194 a: &mut Self::Value,
195 b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
196 _seq_id: u64,
197 ) {
198 if let Some(b) = b {
199 *a = <R as NumericReduction>::combine(*a, b);
200 }
201 }
202
203 #[inline(always)]
204 fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
205 if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
206 *v = <R as NumericReduction>::combine(*v, r);
207 }
208 }
209
210 fn finish(
211 &self,
212 v: Vec<Self::Value>,
213 m: Option<Bitmap>,
214 dtype: &DataType,
215 ) -> PolarsResult<Series> {
216 let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
217 Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
218 }
219}
220
221pub struct VecGroupedReduction<R: Reducer> {
222 values: Vec<R::Value>,
223 evicted_values: Vec<R::Value>,
224 in_dtype: DataType,
225 reducer: R,
226}
227
228impl<R: Reducer> VecGroupedReduction<R> {
229 pub fn new(in_dtype: DataType, reducer: R) -> Self {
230 Self {
231 values: Vec::new(),
232 evicted_values: Vec::new(),
233 in_dtype,
234 reducer,
235 }
236 }
237}
238
239impl<R> GroupedReduction for VecGroupedReduction<R>
240where
241 R: Reducer,
242{
243 fn new_empty(&self) -> Box<dyn GroupedReduction> {
244 Box::new(Self {
245 values: Vec::new(),
246 evicted_values: Vec::new(),
247 in_dtype: self.in_dtype.clone(),
248 reducer: self.reducer.clone(),
249 })
250 }
251
252 fn reserve(&mut self, additional: usize) {
253 self.values.reserve(additional);
254 }
255
256 fn resize(&mut self, num_groups: IdxSize) {
257 self.values.resize(num_groups as usize, self.reducer.init());
258 }
259
260 fn update_group(
261 &mut self,
262 values: &[&Column],
263 group_idx: IdxSize,
264 seq_id: u64,
265 ) -> PolarsResult<()> {
266 assert!(values.len() == 1);
267 let values = values[0];
268 assert!(values.dtype() == &self.in_dtype);
269 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
272 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
273 self.reducer
274 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
275 Ok(())
276 }
277
278 unsafe fn update_groups_while_evicting(
279 &mut self,
280 values: &[&Column],
281 subset: &[IdxSize],
282 group_idxs: &[EvictIdx],
283 seq_id: u64,
284 ) -> PolarsResult<()> {
285 let &[values] = values else { unreachable!() };
286 assert!(values.dtype() == &self.in_dtype);
287 assert!(subset.len() == group_idxs.len());
288 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
291 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
292 let arr = ca.downcast_as_array();
293 unsafe {
294 if values.has_nulls() {
296 for (i, g) in subset.iter().zip(group_idxs) {
297 let ov = arr.get_unchecked(*i as usize);
298 let grp = self.values.get_unchecked_mut(g.idx());
299 if g.should_evict() {
300 let old = core::mem::replace(grp, self.reducer.init());
301 self.evicted_values.push(old);
302 }
303 self.reducer.reduce_one(grp, ov, seq_id);
304 }
305 } else {
306 for (i, g) in subset.iter().zip(group_idxs) {
307 let v = arr.value_unchecked(*i as usize);
308 let grp = self.values.get_unchecked_mut(g.idx());
309 if g.should_evict() {
310 let old = core::mem::replace(grp, self.reducer.init());
311 self.evicted_values.push(old);
312 }
313 self.reducer.reduce_one(grp, Some(v), seq_id);
314 }
315 }
316 }
317 Ok(())
318 }
319
320 unsafe fn combine_subset(
321 &mut self,
322 other: &dyn GroupedReduction,
323 subset: &[IdxSize],
324 group_idxs: &[IdxSize],
325 ) -> PolarsResult<()> {
326 let other = other.as_any().downcast_ref::<Self>().unwrap();
327 assert!(self.in_dtype == other.in_dtype);
328 assert!(subset.len() == group_idxs.len());
329 unsafe {
330 for (i, g) in subset.iter().zip(group_idxs) {
332 let v = other.values.get_unchecked(*i as usize);
333 let grp = self.values.get_unchecked_mut(*g as usize);
334 self.reducer.combine(grp, v);
335 }
336 }
337 Ok(())
338 }
339
340 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
341 Box::new(Self {
342 values: core::mem::take(&mut self.evicted_values),
343 evicted_values: Vec::new(),
344 in_dtype: self.in_dtype.clone(),
345 reducer: self.reducer.clone(),
346 })
347 }
348
349 fn finalize(&mut self) -> PolarsResult<Series> {
350 let v = core::mem::take(&mut self.values);
351 self.reducer.finish(v, None, &self.in_dtype)
352 }
353
354 fn as_any(&self) -> &dyn Any {
355 self
356 }
357}
358
359pub struct VecMaskGroupedReduction<R: Reducer> {
360 values: Vec<R::Value>,
361 mask: MutableBitmap,
362 evicted_values: Vec<R::Value>,
363 evicted_mask: BitmapBuilder,
364 in_dtype: DataType,
365 reducer: R,
366}
367
368impl<R: Reducer> VecMaskGroupedReduction<R> {
369 fn new(in_dtype: DataType, reducer: R) -> Self {
370 Self {
371 values: Vec::new(),
372 mask: MutableBitmap::new(),
373 evicted_values: Vec::new(),
374 evicted_mask: BitmapBuilder::new(),
375 in_dtype,
376 reducer,
377 }
378 }
379}
380
381impl<R> GroupedReduction for VecMaskGroupedReduction<R>
382where
383 R: Reducer,
384{
385 fn new_empty(&self) -> Box<dyn GroupedReduction> {
386 Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
387 }
388
389 fn reserve(&mut self, additional: usize) {
390 self.values.reserve(additional);
391 self.mask.reserve(additional)
392 }
393
394 fn resize(&mut self, num_groups: IdxSize) {
395 self.values.resize(num_groups as usize, self.reducer.init());
396 self.mask.resize(num_groups as usize, false);
397 }
398
399 fn update_group(
400 &mut self,
401 values: &[&Column],
402 group_idx: IdxSize,
403 seq_id: u64,
404 ) -> PolarsResult<()> {
405 let &[values] = values else { unreachable!() };
406 assert!(values.dtype() == &self.in_dtype);
407 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
410 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
411 self.reducer
412 .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
413 if ca.len() != ca.null_count() {
414 self.mask.set(group_idx as usize, true);
415 }
416 Ok(())
417 }
418
419 unsafe fn update_groups_while_evicting(
420 &mut self,
421 values: &[&Column],
422 subset: &[IdxSize],
423 group_idxs: &[EvictIdx],
424 seq_id: u64,
425 ) -> PolarsResult<()> {
426 let &[values] = values else { unreachable!() };
427 assert!(values.dtype() == &self.in_dtype);
428 assert!(subset.len() == group_idxs.len());
429 let seq_id = seq_id + 1; let values = values.as_materialized_series(); let values = self.reducer.cast_series(values);
432 let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
433 let arr = ca.downcast_as_array();
434 unsafe {
435 for (i, g) in subset.iter().zip(group_idxs) {
437 let ov = arr.get_unchecked(*i as usize);
438 let grp = self.values.get_unchecked_mut(g.idx());
439 if g.should_evict() {
440 self.evicted_values
441 .push(core::mem::replace(grp, self.reducer.init()));
442 self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
443 self.mask.set_unchecked(g.idx(), false);
444 }
445 if let Some(v) = ov {
446 self.reducer.reduce_one(grp, Some(v), seq_id);
447 self.mask.set_unchecked(g.idx(), true);
448 }
449 }
450 }
451 Ok(())
452 }
453
454 unsafe fn combine_subset(
455 &mut self,
456 other: &dyn GroupedReduction,
457 subset: &[IdxSize],
458 group_idxs: &[IdxSize],
459 ) -> PolarsResult<()> {
460 let other = other.as_any().downcast_ref::<Self>().unwrap();
461 assert!(self.in_dtype == other.in_dtype);
462 assert!(subset.len() == group_idxs.len());
463 unsafe {
464 for (i, g) in subset.iter().zip(group_idxs) {
466 let o = other.mask.get_unchecked(*i as usize);
467 if o {
468 let v = other.values.get_unchecked(*i as usize);
469 let grp = self.values.get_unchecked_mut(*g as usize);
470 self.reducer.combine(grp, v);
471 self.mask.set_unchecked(*g as usize, true);
472 }
473 }
474 }
475 Ok(())
476 }
477
478 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
479 Box::new(Self {
480 values: core::mem::take(&mut self.evicted_values),
481 mask: core::mem::take(&mut self.evicted_mask).into_mut(),
482 evicted_values: Vec::new(),
483 evicted_mask: BitmapBuilder::new(),
484 in_dtype: self.in_dtype.clone(),
485 reducer: self.reducer.clone(),
486 })
487 }
488
489 fn finalize(&mut self) -> PolarsResult<Series> {
490 let v = core::mem::take(&mut self.values);
491 let m = core::mem::take(&mut self.mask);
492 self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
493 }
494
495 fn as_any(&self) -> &dyn Any {
496 self
497 }
498}
499
500#[derive(Clone)]
501pub struct NullGroupedReduction {
502 num_groups: IdxSize,
503 num_evictions: IdxSize,
504 output: Scalar,
505}
506
507impl NullGroupedReduction {
508 pub fn new(output: Scalar) -> Self {
509 Self {
510 num_groups: 0,
511 num_evictions: 0,
512 output,
513 }
514 }
515}
516
517impl GroupedReduction for NullGroupedReduction {
518 fn new_empty(&self) -> Box<dyn GroupedReduction> {
519 Box::new(Self::new(self.output.clone()))
520 }
521
522 fn reserve(&mut self, _additional: usize) {}
523
524 fn resize(&mut self, num_groups: IdxSize) {
525 self.num_groups = num_groups;
526 }
527
528 fn update_group(
529 &mut self,
530 values: &[&Column],
531 _group_idx: IdxSize,
532 _seq_id: u64,
533 ) -> PolarsResult<()> {
534 let &[values] = values else { unreachable!() };
535 assert!(values.dtype().is_null());
536 Ok(())
537 }
538
539 unsafe fn update_groups_while_evicting(
540 &mut self,
541 values: &[&Column],
542 subset: &[IdxSize],
543 group_idxs: &[EvictIdx],
544 _seq_id: u64,
545 ) -> PolarsResult<()> {
546 let &[values] = values else { unreachable!() };
547 assert!(values.dtype().is_null());
548 assert!(subset.len() == group_idxs.len());
549 for g in group_idxs {
550 self.num_evictions += g.should_evict() as IdxSize;
551 }
552 Ok(())
553 }
554
555 unsafe fn combine_subset(
556 &mut self,
557 other: &dyn GroupedReduction,
558 subset: &[IdxSize],
559 group_idxs: &[IdxSize],
560 ) -> PolarsResult<()> {
561 let _other = other.as_any().downcast_ref::<Self>().unwrap();
562 assert!(subset.len() == group_idxs.len());
563 Ok(())
564 }
565
566 fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
567 Box::new(Self {
568 num_groups: core::mem::replace(&mut self.num_evictions, 0),
569 num_evictions: 0,
570 output: self.output.clone(),
571 })
572 }
573
574 fn finalize(&mut self) -> PolarsResult<Series> {
575 let length = core::mem::replace(&mut self.num_groups, 0) as usize;
576 let s = self.output.clone().into_series(PlSmallStr::EMPTY);
577 Ok(s.new_from_index(0, length))
578 }
579
580 fn as_any(&self) -> &dyn Any {
581 self
582 }
583}