polars-expr 0.53.0

Physical expression implementation of the Polars project.
Documentation
use std::ops::BitAnd;

use arrow::temporal_conversions::MICROSECONDS_IN_DAY as US_IN_DAY;
use polars_core::error::PolarsResult;
use polars_core::prelude::{
    AnyValue, ChunkCast, Column, DataType, IntoColumn, NamedFrom, RollingOptionsFixedWindow,
    TimeUnit,
};
use polars_core::scalar::Scalar;
use polars_core::series::Series;
#[cfg(feature = "cov")]
use polars_plan::dsl::RollingCovOptions;
use polars_plan::prelude::PlanCallback;
use polars_time::prelude::SeriesOpsTime;
use polars_utils::pl_str::PlSmallStr;

fn roll_with_temporal_conversion<F: FnOnce(&Series) -> PolarsResult<Series>>(
    s: &Column,
    op: F,
) -> PolarsResult<Column> {
    let dt = s.dtype();
    let s = if dt.is_temporal() {
        &s.to_physical_repr()
    } else {
        s
    };

    // @scalar-opt
    let out = op(s.as_materialized_series())?;

    Ok(match dt {
        DataType::Date => (out * US_IN_DAY as f64)
            .cast(&DataType::Int64)?
            .into_datetime(TimeUnit::Microseconds, None),
        DataType::Datetime(tu, tz) => out.cast(&DataType::Int64)?.into_datetime(*tu, tz.clone()),
        DataType::Duration(tu) => out.cast(&DataType::Int64)?.into_duration(*tu),
        DataType::Time => out.cast(&DataType::Int64)?.into_time(),
        _ => out,
    }
    .into_column())
}

pub(super) fn rolling_min(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    // @scalar-opt
    s.as_materialized_series()
        .rolling_min(options)
        .map(Column::from)
}

pub(super) fn rolling_max(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    // @scalar-opt
    s.as_materialized_series()
        .rolling_max(options)
        .map(Column::from)
}

pub(super) fn rolling_mean(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    roll_with_temporal_conversion(s, |s| s.rolling_mean(options))
}

pub(super) fn rolling_sum(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    // @scalar-opt
    s.as_materialized_series()
        .rolling_sum(options)
        .map(Column::from)
}

pub(super) fn rolling_quantile(
    s: &Column,
    options: RollingOptionsFixedWindow,
) -> PolarsResult<Column> {
    roll_with_temporal_conversion(s, |s| s.rolling_quantile(options))
}

pub(super) fn rolling_var(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    // @scalar-opt
    s.as_materialized_series()
        .rolling_var(options)
        .map(Column::from)
}

pub(super) fn rolling_std(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    // @scalar-opt
    s.as_materialized_series()
        .rolling_std(options)
        .map(Column::from)
}

pub(super) fn rolling_rank(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    // @scalar-opt
    s.as_materialized_series()
        .rolling_rank(options)
        .map(Column::from)
}

#[cfg(feature = "moment")]
pub(super) fn rolling_skew(s: &Column, options: RollingOptionsFixedWindow) -> PolarsResult<Column> {
    // @scalar-opt
    let s = s.as_materialized_series();
    polars_ops::series::rolling_skew(s, options).map(Column::from)
}

#[cfg(feature = "moment")]
pub(super) fn rolling_kurtosis(
    s: &Column,
    options: RollingOptionsFixedWindow,
) -> PolarsResult<Column> {
    // @scalar-opt
    let s = s.as_materialized_series();
    polars_ops::series::rolling_kurtosis(s, options).map(Column::from)
}

#[cfg(feature = "cov")]
fn det_count_x_y(window_size: usize, len: usize, dtype: &DataType) -> Series {
    match dtype {
        DataType::Float64 => {
            let values = (0..len)
                .map(|v| std::cmp::min(window_size, v + 1) as f64)
                .collect::<Vec<_>>();
            Series::new(PlSmallStr::EMPTY, values)
        },
        DataType::Float32 => {
            let values = (0..len)
                .map(|v| std::cmp::min(window_size, v + 1) as f32)
                .collect::<Vec<_>>();
            Series::new(PlSmallStr::EMPTY, values)
        },
        #[cfg(feature = "dtype-f16")]
        DataType::Float16 => {
            use num_traits::AsPrimitive;
            use polars_utils::float16::pf16;
            let values = (0..len)
                .map(|v| std::cmp::min(window_size, v + 1))
                .map(AsPrimitive::<pf16>::as_)
                .collect::<Vec<_>>();
            Series::new(PlSmallStr::EMPTY, values)
        },
        _ => unreachable!(),
    }
}

#[cfg(feature = "cov")]
pub(super) fn rolling_corr_cov(
    s: &[Column],
    rolling_options: RollingOptionsFixedWindow,
    cov_options: RollingCovOptions,
    is_corr: bool,
) -> PolarsResult<Column> {
    let mut x = s[0].as_materialized_series().rechunk();
    let mut y = s[1].as_materialized_series().rechunk();

    if !x.dtype().is_float() {
        x = x.cast(&DataType::Float64)?;
    }
    if !y.dtype().is_float() {
        y = y.cast(&DataType::Float64)?;
    }
    let dtype = x.dtype().clone();

    let mean_x_y = (&x * &y)?.rolling_mean(rolling_options.clone())?;
    let rolling_options_count = RollingOptionsFixedWindow {
        window_size: rolling_options.window_size,
        min_periods: 0,
        ..Default::default()
    };

    let count_x_y = if (x.null_count() + y.null_count()) > 0 {
        // mask out nulls on both sides before compute mean/var

        let valids = x.is_not_null().bitand(y.is_not_null());
        let valids_arr = valids.downcast_as_array();
        let valids_bitmap = valids_arr.values();

        unsafe {
            let xarr = &mut x.chunks_mut()[0];
            *xarr = xarr.with_validity(Some(valids_bitmap.clone()));
            let yarr = &mut y.chunks_mut()[0];
            *yarr = yarr.with_validity(Some(valids_bitmap.clone()));
            x.compute_len();
            y.compute_len();
        }
        valids
            .cast(&dtype)
            .unwrap()
            .rolling_sum(rolling_options_count)?
    } else {
        det_count_x_y(rolling_options.window_size, x.len(), &dtype)
    };

    let mean_x = x.rolling_mean(rolling_options.clone())?;
    let mean_y = y.rolling_mean(rolling_options.clone())?;
    let ddof = Series::new(
        PlSmallStr::EMPTY,
        &[AnyValue::from(cov_options.ddof).cast(&dtype)],
    );

    let numerator = ((mean_x_y - (mean_x * mean_y).unwrap()).unwrap()
        * (count_x_y.clone() / (count_x_y - ddof).unwrap()).unwrap())
    .unwrap();

    if is_corr {
        let var_x = x.rolling_var(rolling_options.clone())?;
        let var_y = y.rolling_var(rolling_options)?;

        let base = (var_x * var_y).unwrap();
        let sc = Scalar::new(
            base.dtype().clone(),
            AnyValue::Float64(0.5).cast(&dtype).into_static(),
        );
        let denominator = super::pow::pow(&mut [base.into_column(), sc.into_column("".into())])
            .unwrap()
            .take_materialized_series();

        Ok((numerator / denominator)?.into_column())
    } else {
        Ok(numerator.into_column())
    }
}

pub fn rolling_map(
    c: &Column,
    rolling_options: RollingOptionsFixedWindow,
    f: PlanCallback<Series, Series>,
) -> PolarsResult<Column> {
    c.as_materialized_series()
        .rolling_map(
            &(|s: &Series| f.call(s.clone())?.strict_cast(s.dtype())) as &_,
            rolling_options,
        )
        .map(Column::from)
}