use polars_time::{PolarsTemporalGroupby, RollingGroupOptions};
use super::*;
pub(crate) struct RollingExpr {
pub(crate) function: Expr,
pub(crate) phys_function: Arc<dyn PhysicalExpr>,
pub(crate) out_name: Option<PlSmallStr>,
pub(crate) options: RollingGroupOptions,
pub(crate) expr: Expr,
}
impl PhysicalExpr for RollingExpr {
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let groups_key = format!("{:?}", &self.options);
let groups = {
state.window_cache.get_groups(&groups_key).clone()
};
let groups = match groups {
Some(groups) => groups,
None => {
let (_time_key, _keys, groups) = df.rolling(vec![], &self.options)?;
state.window_cache.insert_groups(groups_key, groups.clone());
groups
},
};
let mut out = self
.phys_function
.evaluate_on_groups(df, &groups, state)?
.finalize();
polars_ensure!(out.len() == groups.len(), agg_len = out.len(), groups.len());
if let Some(name) = &self.out_name {
out.rename(name.clone());
}
Ok(out.into_column())
}
fn evaluate_on_groups<'a>(
&self,
_df: &DataFrame,
_groups: &'a GroupPositions,
_state: &ExecutionState,
) -> PolarsResult<AggregationContext<'a>> {
polars_bail!(InvalidOperation: "rolling expression not allowed in aggregation");
}
fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.phys_function.collect_live_columns(lv);
}
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.function.to_field(input_schema, Context::Default)
}
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}
fn is_scalar(&self) -> bool {
false
}
}