mod agg_list;
mod boolean;
mod dispatch;
mod string;
use std::borrow::Cow;
pub use agg_list::*;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::legacy::kernels::take_agg::*;
use arrow::legacy::trusted_len::TrustedLenPush;
use arrow::types::NativeType;
use num_traits::pow::Pow;
use num_traits::{Bounded, Float, Num, NumCast, ToPrimitive, Zero};
use polars_compute::rolling::no_nulls::{
MaxWindow, MinWindow, MomentWindow, QuantileWindow, RollingAggWindowNoNulls,
};
use polars_compute::rolling::nulls::{RollingAggWindowNulls, VarianceMoment};
use polars_compute::rolling::quantile_filter::SealedRolling;
use polars_compute::rolling::{
self, ArgMaxWindow, ArgMinWindow, MeanWindow, QuantileMethod, RollingFnParams,
RollingQuantileParams, RollingVarParams, SumWindow, quantile_filter,
};
use polars_utils::arg_min_max::ArgMinMax;
use polars_utils::float::IsFloat;
#[cfg(feature = "dtype-f16")]
use polars_utils::float16::pf16;
use polars_utils::idx_vec::IdxVec;
use polars_utils::kahan_sum::KahanSum;
use polars_utils::min_max::MinMax;
use rayon::prelude::*;
use crate::POOL;
use crate::chunked_array::cast::CastOptions;
#[cfg(feature = "object")]
use crate::chunked_array::object::extension::create_extension;
use crate::chunked_array::{arg_max_numeric, arg_min_numeric};
#[cfg(feature = "object")]
use crate::frame::group_by::GroupsIndicator;
use crate::prelude::*;
use crate::series::IsSorted;
use crate::series::implementations::SeriesWrap;
use crate::utils::NoNull;
fn idx2usize(idx: &[IdxSize]) -> impl ExactSizeIterator<Item = usize> + '_ {
idx.iter().map(|i| *i as usize)
}
pub fn _use_rolling_kernels(
groups: &GroupsSlice,
overlapping: bool,
monotonic: bool,
chunks: &[ArrayRef],
) -> bool {
match groups.len() {
0 | 1 => false,
_ => overlapping && monotonic && chunks.len() == 1,
}
}
pub fn _rolling_apply_agg_window_nulls<Agg, T, O, Out>(
values: &[T],
validity: &Bitmap,
offsets: O,
params: Option<RollingFnParams>,
) -> PrimitiveArray<Out>
where
O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
Agg: RollingAggWindowNulls<T, Out>,
T: IsFloat + NativeType,
Out: NativeType,
{
if values.is_empty() {
let out: Vec<Out> = vec![];
return PrimitiveArray::new(Out::PRIMITIVE.into(), out.into(), None);
}
let output_len = offsets.size_hint().0;
let mut agg_window = Agg::new(values, validity, 0, 0, params, None);
let mut validity = MutableBitmap::with_capacity(output_len);
validity.extend_constant(output_len, true);
let out = offsets
.enumerate()
.map(|(idx, (start, len))| {
let end = start + len;
unsafe { agg_window.update(start as usize, end as usize) };
match agg_window.get_agg(idx) {
Some(val) => val,
None => {
unsafe { validity.set_unchecked(idx, false) };
Out::default()
},
}
})
.collect_trusted::<Vec<_>>();
PrimitiveArray::new(Out::PRIMITIVE.into(), out.into(), Some(validity.into()))
}
pub fn _rolling_apply_agg_window_no_nulls<Agg, T, O, Out>(
values: &[T],
offsets: O,
params: Option<RollingFnParams>,
) -> PrimitiveArray<Out>
where
Agg: RollingAggWindowNoNulls<T, Out>,
O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
T: IsFloat + NativeType,
Out: NativeType,
{
if values.is_empty() {
let out: Vec<Out> = vec![];
return PrimitiveArray::new(Out::PRIMITIVE.into(), out.into(), None);
}
let mut agg_window = Agg::new(values, 0, 0, params, None);
offsets
.enumerate()
.map(|(idx, (start, len))| {
let end = start + len;
unsafe { agg_window.update(start as usize, end as usize) };
agg_window.get_agg(idx)
})
.collect::<PrimitiveArray<Out>>()
}
pub fn _slice_from_offsets<T>(ca: &ChunkedArray<T>, first: IdxSize, len: IdxSize) -> ChunkedArray<T>
where
T: PolarsDataType,
{
ca.slice(first as i64, len as usize)
}
pub fn _agg_helper_idx<T, F>(groups: &GroupsIdx, f: F) -> Series
where
F: Fn((IdxSize, &IdxVec)) -> Option<T::Native> + Send + Sync,
T: PolarsNumericType,
{
let ca: ChunkedArray<T> = POOL.install(|| groups.into_par_iter().map(f).collect());
ca.into_series()
}
pub fn _agg_helper_idx_no_null<T, F>(groups: &GroupsIdx, f: F) -> Series
where
F: Fn((IdxSize, &IdxVec)) -> T::Native + Send + Sync,
T: PolarsNumericType,
{
let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.into_par_iter().map(f).collect());
ca.into_inner().into_series()
}
fn agg_helper_idx_on_all<T, F>(groups: &GroupsIdx, f: F) -> Series
where
F: Fn(&IdxVec) -> Option<T::Native> + Send + Sync,
T: PolarsNumericType,
{
let ca: ChunkedArray<T> = POOL.install(|| groups.all().into_par_iter().map(f).collect());
ca.into_series()
}
pub fn _agg_helper_slice<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
where
F: Fn([IdxSize; 2]) -> Option<T::Native> + Send + Sync,
T: PolarsNumericType,
{
let ca: ChunkedArray<T> = POOL.install(|| groups.par_iter().copied().map(f).collect());
ca.into_series()
}
pub fn _agg_helper_slice_no_null<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
where
F: Fn([IdxSize; 2]) -> T::Native + Send + Sync,
T: PolarsNumericType,
{
let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.par_iter().copied().map(f).collect());
ca.into_inner().into_series()
}
trait QuantileDispatcher<K> {
fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<K>>;
fn _median(self) -> Option<K>;
}
impl<T> QuantileDispatcher<f64> for ChunkedArray<T>
where
T: PolarsIntegerType,
T::Native: Ord,
{
fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
self.quantile_faster(quantile, method)
}
fn _median(self) -> Option<f64> {
self.median_faster()
}
}
#[cfg(feature = "dtype-f16")]
impl QuantileDispatcher<pf16> for Float16Chunked {
fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<pf16>> {
self.quantile_faster(quantile, method)
}
fn _median(self) -> Option<pf16> {
self.median_faster()
}
}
impl QuantileDispatcher<f32> for Float32Chunked {
fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f32>> {
self.quantile_faster(quantile, method)
}
fn _median(self) -> Option<f32> {
self.median_faster()
}
}
impl QuantileDispatcher<f64> for Float64Chunked {
fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
self.quantile_faster(quantile, method)
}
fn _median(self) -> Option<f64> {
self.median_faster()
}
}
unsafe fn agg_quantile_generic<T, K>(
ca: &ChunkedArray<T>,
groups: &GroupsType,
quantile: f64,
method: QuantileMethod,
) -> Series
where
T: PolarsNumericType,
ChunkedArray<T>: QuantileDispatcher<K::Native>,
K: PolarsNumericType,
<K as datatypes::PolarsNumericType>::Native: num_traits::Float + quantile_filter::SealedRolling,
{
let invalid_quantile = !(0.0..=1.0).contains(&quantile);
if invalid_quantile {
return Series::full_null(ca.name().clone(), groups.len(), ca.dtype());
}
match groups {
GroupsType::Idx(groups) => {
let ca = ca.rechunk();
agg_helper_idx_on_all::<K, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() {
return None;
}
let take = { ca.take_unchecked(idx) };
take._quantile(quantile, method).unwrap_unchecked()
})
},
GroupsType::Slice {
groups,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups, *overlapping, *monotonic, ca.chunks()) {
let s = ca
.cast_with_options(&K::get_static_dtype(), CastOptions::Overflowing)
.unwrap();
let ca: &ChunkedArray<K> = s.as_ref().as_ref();
let arr = ca.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
let arr = match arr.validity() {
None => _rolling_apply_agg_window_no_nulls::<QuantileWindow<_>, _, _, _>(
values,
offset_iter,
Some(RollingFnParams::Quantile(RollingQuantileParams {
prob: quantile,
method,
})),
),
Some(validity) => {
_rolling_apply_agg_window_nulls::<rolling::nulls::QuantileWindow<_>, _, _, _>(
values,
validity,
offset_iter,
Some(RollingFnParams::Quantile(RollingQuantileParams {
prob: quantile,
method,
})),
)
},
};
ChunkedArray::<K>::with_chunk(PlSmallStr::EMPTY, arr).into_series()
} else {
_agg_helper_slice::<K, _>(groups, |[first, len]| {
debug_assert!(first + len <= ca.len() as IdxSize);
match len {
0 => None,
1 => ca.get(first as usize).map(|v| NumCast::from(v).unwrap()),
_ => {
let arr_group = _slice_from_offsets(ca, first, len);
arr_group
._quantile(quantile, method)
.unwrap_unchecked()
.map(|flt| NumCast::from(flt).unwrap_unchecked())
},
}
})
}
},
}
}
unsafe fn agg_median_generic<T, K>(ca: &ChunkedArray<T>, groups: &GroupsType) -> Series
where
T: PolarsNumericType,
ChunkedArray<T>: QuantileDispatcher<K::Native>,
K: PolarsNumericType,
<K as datatypes::PolarsNumericType>::Native: num_traits::Float + SealedRolling,
{
match groups {
GroupsType::Idx(groups) => {
let ca = ca.rechunk();
agg_helper_idx_on_all::<K, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() {
return None;
}
let take = { ca.take_unchecked(idx) };
take._median()
})
},
GroupsType::Slice { .. } => {
agg_quantile_generic::<T, K>(ca, groups, 0.5, QuantileMethod::Linear)
},
}
}
#[cfg(feature = "bitwise")]
unsafe fn bitwise_agg<T: PolarsNumericType>(
ca: &ChunkedArray<T>,
groups: &GroupsType,
f: fn(&ChunkedArray<T>) -> Option<T::Native>,
) -> Series
where
ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
{
let s = if groups.len() > 1 {
ca.rechunk()
} else {
Cow::Borrowed(ca)
};
match groups {
GroupsType::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
debug_assert!(idx.len() <= s.len());
if idx.is_empty() {
None
} else {
let take = unsafe { s.take_unchecked(idx) };
f(&take)
}
}),
GroupsType::Slice { groups, .. } => _agg_helper_slice::<T, _>(groups, |[first, len]| {
debug_assert!(len <= s.len() as IdxSize);
if len == 0 {
None
} else {
let take = _slice_from_offsets(&s, first, len);
f(&take)
}
}),
}
}
#[cfg(feature = "bitwise")]
impl<T> ChunkedArray<T>
where
T: PolarsNumericType,
ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
{
pub(crate) unsafe fn agg_and(&self, groups: &GroupsType) -> Series {
unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::and_reduce) }
}
pub(crate) unsafe fn agg_or(&self, groups: &GroupsType) -> Series {
unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::or_reduce) }
}
pub(crate) unsafe fn agg_xor(&self, groups: &GroupsType) -> Series {
unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::xor_reduce) }
}
}
impl<T> ChunkedArray<T>
where
T: PolarsNumericType + Sync,
T::Native: NativeType + PartialOrd + Num + NumCast + Zero + Bounded + std::iter::Sum<T::Native>,
ChunkedArray<T>: ChunkAgg<T::Native>,
{
pub(crate) unsafe fn agg_min(&self, groups: &GroupsType) -> Series {
if groups.is_sorted_flag() {
match self.is_sorted_flag() {
IsSorted::Ascending => {
return self.clone().into_series().agg_first_non_null(groups);
},
IsSorted::Descending => {
return self.clone().into_series().agg_last_non_null(groups);
},
_ => {},
}
}
match groups {
GroupsType::Idx(groups) => {
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
_agg_helper_idx::<T, _>(groups, |(first, idx)| {
debug_assert!(idx.len() <= arr.len());
if idx.is_empty() {
None
} else if idx.len() == 1 {
arr.get(first as usize)
} else if no_nulls {
take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
.reduce(|a, b| a.min_ignore_nan(b))
} else {
take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
.reduce(|a, b| a.min_ignore_nan(b))
}
})
},
GroupsType::Slice {
groups: groups_slice,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
let arr = self.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
let arr = match arr.validity() {
None => _rolling_apply_agg_window_no_nulls::<MinWindow<_>, _, _, _>(
values,
offset_iter,
None,
),
Some(validity) => {
_rolling_apply_agg_window_nulls::<rolling::nulls::MinWindow<_>, _, _, _>(
values,
validity,
offset_iter,
None,
)
},
};
Self::from(arr).into_series()
} else {
_agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => None,
1 => self.get(first as usize),
_ => {
let arr_group = _slice_from_offsets(self, first, len);
ChunkAgg::min(&arr_group)
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_arg_min(&self, groups: &GroupsType) -> Series
where
for<'b> &'b [T::Native]: ArgMinMax,
{
if groups.is_sorted_flag() {
match self.is_sorted_flag() {
IsSorted::Ascending => {
return self.clone().into_series().agg_arg_first_non_null(groups);
},
IsSorted::Descending => {
return self.clone().into_series().agg_arg_last_non_null(groups);
},
_ => {},
}
}
match groups {
GroupsType::Idx(groups) => {
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();
let no_nulls = !arr.has_nulls();
agg_helper_idx_on_all::<IdxType, _>(groups, |idx| {
if idx.is_empty() {
return None;
}
if no_nulls {
let first_i = idx[0] as usize;
let mut best_pos: IdxSize = 0;
let mut best_val: T::Native = unsafe { arr.value_unchecked(first_i) };
for (pos, &i) in idx.iter().enumerate().skip(1) {
let v = unsafe { arr.value_unchecked(i as usize) };
if v.nan_max_lt(&best_val) {
best_val = v;
best_pos = pos as IdxSize;
}
}
Some(best_pos)
} else {
let (start_pos, mut best_val) = idx
.iter()
.enumerate()
.find_map(|(pos, &i)| arr.get(i as usize).map(|v| (pos, v)))?;
let mut best_pos: IdxSize = start_pos as IdxSize;
for (pos, &i) in idx.iter().enumerate().skip(start_pos + 1) {
if let Some(v) = arr.get(i as usize) {
if v.nan_max_lt(&best_val) {
best_val = v;
best_pos = pos as IdxSize;
}
}
}
Some(best_pos)
}
})
},
GroupsType::Slice {
groups: groups_slice,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
let arr = self.downcast_as_array();
let values = arr.values().as_slice();
let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
let idx_arr = match arr.validity() {
None => {
_rolling_apply_agg_window_no_nulls::<ArgMinWindow<_>, _, _, IdxSize>(
values,
offset_iter,
None,
)
},
Some(validity) => {
_rolling_apply_agg_window_nulls::<ArgMinWindow<_>, _, _, IdxSize>(
values,
validity,
offset_iter,
None,
)
},
};
IdxCa::from(idx_arr).into_series()
} else {
_agg_helper_slice::<IdxType, _>(groups_slice, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => None,
1 => Some(0 as IdxSize),
_ => {
let group_ca = _slice_from_offsets(self, first, len);
let pos_in_group: Option<usize> = arg_min_numeric(&group_ca);
pos_in_group.map(|p| p as IdxSize)
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_max(&self, groups: &GroupsType) -> Series {
if groups.is_sorted_flag() {
match self.is_sorted_flag() {
IsSorted::Ascending => return self.clone().into_series().agg_last_non_null(groups),
IsSorted::Descending => {
return self.clone().into_series().agg_first_non_null(groups);
},
_ => {},
}
}
match groups {
GroupsType::Idx(groups) => {
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
_agg_helper_idx::<T, _>(groups, |(first, idx)| {
debug_assert!(idx.len() <= arr.len());
if idx.is_empty() {
None
} else if idx.len() == 1 {
arr.get(first as usize)
} else if no_nulls {
take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
.reduce(|a, b| a.max_ignore_nan(b))
} else {
take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
.reduce(|a, b| a.max_ignore_nan(b))
}
})
},
GroupsType::Slice {
groups: groups_slice,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
let arr = self.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
let arr = match arr.validity() {
None => _rolling_apply_agg_window_no_nulls::<MaxWindow<_>, _, _, _>(
values,
offset_iter,
None,
),
Some(validity) => {
_rolling_apply_agg_window_nulls::<rolling::nulls::MaxWindow<_>, _, _, _>(
values,
validity,
offset_iter,
None,
)
},
};
Self::from(arr).into_series()
} else {
_agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => None,
1 => self.get(first as usize),
_ => {
let arr_group = _slice_from_offsets(self, first, len);
ChunkAgg::max(&arr_group)
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_arg_max(&self, groups: &GroupsType) -> Series
where
for<'b> &'b [T::Native]: ArgMinMax,
{
if groups.is_sorted_flag() {
match self.is_sorted_flag() {
IsSorted::Ascending => {
return self.clone().into_series().agg_arg_last_non_null(groups);
},
IsSorted::Descending => {
return self.clone().into_series().agg_arg_first_non_null(groups);
},
_ => {},
}
}
match groups {
GroupsType::Idx(groups) => {
let ca = self.rechunk();
let arr = ca.downcast_as_array();
let no_nulls = arr.null_count() == 0;
agg_helper_idx_on_all::<IdxType, _>(groups, |idx| {
if idx.is_empty() {
return None;
}
if no_nulls {
let first_i = idx[0] as usize;
let mut best_pos: IdxSize = 0;
let mut best_val: T::Native = unsafe { arr.value_unchecked(first_i) };
for (pos, &i) in idx.iter().enumerate().skip(1) {
let v = unsafe { arr.value_unchecked(i as usize) };
if v.nan_min_gt(&best_val) {
best_val = v;
best_pos = pos as IdxSize;
}
}
Some(best_pos)
} else {
let (start_pos, mut best_val) = idx
.iter()
.enumerate()
.find_map(|(pos, &i)| arr.get(i as usize).map(|v| (pos, v)))?;
let mut best_pos: IdxSize = start_pos as IdxSize;
for (pos, &i) in idx.iter().enumerate().skip(start_pos + 1) {
if let Some(v) = arr.get(i as usize) {
if v.nan_min_gt(&best_val) {
best_val = v;
best_pos = pos as IdxSize;
}
}
}
Some(best_pos)
}
})
},
GroupsType::Slice {
groups: groups_slice,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
let arr = self.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
let idx_arr = match arr.validity() {
None => {
_rolling_apply_agg_window_no_nulls::<ArgMaxWindow<_>, _, _, IdxSize>(
values,
offset_iter,
None,
)
},
Some(validity) => {
_rolling_apply_agg_window_nulls::<ArgMaxWindow<_>, _, _, IdxSize>(
values,
validity,
offset_iter,
None,
)
},
};
IdxCa::from(idx_arr).into_series()
} else {
_agg_helper_slice::<IdxType, _>(groups_slice, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => None,
1 => Some(0 as IdxSize),
_ => {
let group_ca = _slice_from_offsets(self, first, len);
let pos_in_group: Option<usize> = arg_max_numeric(&group_ca);
pos_in_group.map(|p| p as IdxSize)
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_sum(&self, groups: &GroupsType) -> Series {
match groups {
GroupsType::Idx(groups) => {
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
_agg_helper_idx_no_null::<T, _>(groups, |(first, idx)| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
T::Native::zero()
} else if idx.len() == 1 {
arr.get(first as usize).unwrap_or(T::Native::zero())
} else if no_nulls {
if T::Native::is_float() {
take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
.fold(KahanSum::default(), |k, x| k + x)
.sum()
} else {
take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
.fold(T::Native::zero(), |a, b| a + b)
}
} else if T::Native::is_float() {
take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
.fold(KahanSum::default(), |k, x| k + x)
.sum()
} else {
take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
.fold(T::Native::zero(), |a, b| a + b)
}
})
},
GroupsType::Slice {
groups,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
let arr = self.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
let arr = match arr.validity() {
None => _rolling_apply_agg_window_no_nulls::<
SumWindow<T::Native, T::Native>,
_,
_,
_,
>(values, offset_iter, None),
Some(validity) => {
_rolling_apply_agg_window_nulls::<
SumWindow<T::Native, T::Native>,
_,
_,
_,
>(values, validity, offset_iter, None)
},
};
Self::from(arr).into_series()
} else {
_agg_helper_slice_no_null::<T, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => T::Native::zero(),
1 => self.get(first as usize).unwrap_or(T::Native::zero()),
_ => {
let arr_group = _slice_from_offsets(self, first, len);
arr_group.sum().unwrap_or(T::Native::zero())
},
}
})
}
},
}
}
}
impl<T> SeriesWrap<ChunkedArray<T>>
where
T: PolarsFloatType,
ChunkedArray<T>: ChunkVar
+ VarAggSeries
+ ChunkQuantile<T::Native>
+ QuantileAggSeries
+ ChunkAgg<T::Native>,
T::Native: Pow<T::Native, Output = T::Native>,
{
pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
match groups {
GroupsType::Idx(groups) => {
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
_agg_helper_idx::<T, _>(groups, |(first, idx)| {
debug_assert!(idx.len() <= self.len());
let out = if idx.is_empty() {
None
} else if idx.len() == 1 {
arr.get(first as usize).map(|sum| sum.to_f64().unwrap())
} else if no_nulls {
Some(
take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
.fold(KahanSum::default(), |a, b| {
a + b.to_f64().unwrap_unchecked()
})
.sum()
/ idx.len() as f64,
)
} else {
take_agg_primitive_iter_unchecked_count_nulls(
arr,
idx2usize(idx),
KahanSum::default(),
|a, b| a + b.to_f64().unwrap_unchecked(),
idx.len() as IdxSize,
)
.map(|(sum, null_count)| sum.sum() / (idx.len() as f64 - null_count as f64))
};
out.map(|flt| NumCast::from(flt).unwrap())
})
},
GroupsType::Slice {
groups,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
let arr = self.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
let arr = match arr.validity() {
None => _rolling_apply_agg_window_no_nulls::<MeanWindow<_>, _, _, _>(
values,
offset_iter,
None,
),
Some(validity) => {
_rolling_apply_agg_window_nulls::<MeanWindow<_>, _, _, _>(
values,
validity,
offset_iter,
None,
)
},
};
ChunkedArray::<T>::from(arr).into_series()
} else {
_agg_helper_slice::<T, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => None,
1 => self.get(first as usize),
_ => {
let arr_group = _slice_from_offsets(self, first, len);
arr_group.mean().map(|flt| NumCast::from(flt).unwrap())
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series
where
<T as datatypes::PolarsNumericType>::Native: num_traits::Float,
{
let ca = &self.0.rechunk();
match groups {
GroupsType::Idx(groups) => {
let ca = ca.rechunk();
let arr = ca.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
agg_helper_idx_on_all::<T, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() {
return None;
}
let out = if no_nulls {
take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
} else {
take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
};
out.map(|flt| NumCast::from(flt).unwrap())
})
},
GroupsType::Slice {
groups,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
let arr = self.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
let arr = match arr.validity() {
None => _rolling_apply_agg_window_no_nulls::<
MomentWindow<_, VarianceMoment>,
_,
_,
_,
>(
values,
offset_iter,
Some(RollingFnParams::Var(RollingVarParams { ddof })),
),
Some(validity) => _rolling_apply_agg_window_nulls::<
rolling::nulls::MomentWindow<_, VarianceMoment>,
_,
_,
_,
>(
values,
validity,
offset_iter,
Some(RollingFnParams::Var(RollingVarParams { ddof })),
),
};
ChunkedArray::<T>::from(arr).into_series()
} else {
_agg_helper_slice::<T, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => None,
1 => {
if ddof == 0 {
NumCast::from(0)
} else {
None
}
},
_ => {
let arr_group = _slice_from_offsets(self, first, len);
arr_group.var(ddof).map(|flt| NumCast::from(flt).unwrap())
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series
where
<T as datatypes::PolarsNumericType>::Native: num_traits::Float,
{
let ca = &self.0.rechunk();
match groups {
GroupsType::Idx(groups) => {
let arr = ca.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
agg_helper_idx_on_all::<T, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() {
return None;
}
let out = if no_nulls {
take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
} else {
take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
};
out.map(|flt| NumCast::from(flt.sqrt()).unwrap())
})
},
GroupsType::Slice {
groups,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
let arr = ca.downcast_iter().next().unwrap();
let values = arr.values().as_slice();
let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
let arr = match arr.validity() {
None => _rolling_apply_agg_window_no_nulls::<
MomentWindow<_, VarianceMoment>,
_,
_,
_,
>(
values,
offset_iter,
Some(RollingFnParams::Var(RollingVarParams { ddof })),
),
Some(validity) => _rolling_apply_agg_window_nulls::<
rolling::nulls::MomentWindow<_, rolling::nulls::VarianceMoment>,
_,
_,
_,
>(
values,
validity,
offset_iter,
Some(RollingFnParams::Var(RollingVarParams { ddof })),
),
};
let mut ca = ChunkedArray::<T>::from(arr);
ca.apply_mut(|v| v.powf(NumCast::from(0.5).unwrap()));
ca.into_series()
} else {
_agg_helper_slice::<T, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
match len {
0 => None,
1 => {
if ddof == 0 {
NumCast::from(0)
} else {
None
}
},
_ => {
let arr_group = _slice_from_offsets(self, first, len);
arr_group.std(ddof).map(|flt| NumCast::from(flt).unwrap())
},
}
})
}
},
}
}
}
impl Float32Chunked {
pub(crate) unsafe fn agg_quantile(
&self,
groups: &GroupsType,
quantile: f64,
method: QuantileMethod,
) -> Series {
agg_quantile_generic::<_, Float32Type>(self, groups, quantile, method)
}
pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
agg_median_generic::<_, Float32Type>(self, groups)
}
}
impl Float64Chunked {
pub(crate) unsafe fn agg_quantile(
&self,
groups: &GroupsType,
quantile: f64,
method: QuantileMethod,
) -> Series {
agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
}
pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
agg_median_generic::<_, Float64Type>(self, groups)
}
}
impl<T> ChunkedArray<T>
where
T: PolarsIntegerType,
ChunkedArray<T>: ChunkAgg<T::Native> + ChunkVar,
T::Native: NumericNative + Ord,
{
pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
match groups {
GroupsType::Idx(groups) => {
let ca = self.rechunk();
let arr = ca.downcast_get(0).unwrap();
_agg_helper_idx::<Float64Type, _>(groups, |(first, idx)| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
None
} else if idx.len() == 1 {
self.get(first as usize).map(|sum| sum.to_f64().unwrap())
} else {
match (self.has_nulls(), self.chunks.len()) {
(false, 1) => Some(
take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
.fold(KahanSum::default(), |a, b| a + b.to_f64().unwrap())
.sum()
/ idx.len() as f64,
),
(_, 1) => {
take_agg_primitive_iter_unchecked_count_nulls(
arr,
idx2usize(idx),
KahanSum::default(),
|a, b| a + b.to_f64().unwrap(),
idx.len() as IdxSize,
)
}
.map(|(sum, null_count)| {
sum.sum() / (idx.len() as f64 - null_count as f64)
}),
_ => {
let take = { self.take_unchecked(idx) };
take.mean()
},
}
}
})
},
GroupsType::Slice {
groups: groups_slice,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
let ca = self
.cast_with_options(&DataType::Float64, CastOptions::Overflowing)
.unwrap();
ca.agg_mean(groups)
} else {
_agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
debug_assert!(first + len <= self.len() as IdxSize);
match len {
0 => None,
1 => self.get(first as usize).map(|v| NumCast::from(v).unwrap()),
_ => {
let arr_group = _slice_from_offsets(self, first, len);
arr_group.mean()
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series {
match groups {
GroupsType::Idx(groups) => {
let ca_self = self.rechunk();
let arr = ca_self.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
debug_assert!(idx.len() <= arr.len());
if idx.is_empty() {
return None;
}
if no_nulls {
take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
} else {
take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
}
})
},
GroupsType::Slice {
groups: groups_slice,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
let ca = self
.cast_with_options(&DataType::Float64, CastOptions::Overflowing)
.unwrap();
ca.agg_var(groups, ddof)
} else {
_agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
debug_assert!(first + len <= self.len() as IdxSize);
match len {
0 => None,
1 => {
if ddof == 0 {
NumCast::from(0)
} else {
None
}
},
_ => {
let arr_group = _slice_from_offsets(self, first, len);
arr_group.var(ddof)
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series {
match groups {
GroupsType::Idx(groups) => {
let ca_self = self.rechunk();
let arr = ca_self.downcast_iter().next().unwrap();
let no_nulls = arr.null_count() == 0;
agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let out = if no_nulls {
take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
} else {
take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
};
out.map(|v| v.sqrt())
})
},
GroupsType::Slice {
groups: groups_slice,
overlapping,
monotonic,
} => {
if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
let ca = self
.cast_with_options(&DataType::Float64, CastOptions::Overflowing)
.unwrap();
ca.agg_std(groups, ddof)
} else {
_agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
debug_assert!(first + len <= self.len() as IdxSize);
match len {
0 => None,
1 => {
if ddof == 0 {
NumCast::from(0)
} else {
None
}
},
_ => {
let arr_group = _slice_from_offsets(self, first, len);
arr_group.std(ddof)
},
}
})
}
},
}
}
pub(crate) unsafe fn agg_quantile(
&self,
groups: &GroupsType,
quantile: f64,
method: QuantileMethod,
) -> Series {
agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
}
pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
agg_median_generic::<_, Float64Type>(self, groups)
}
}