use rayon::prelude::*;
use super::*;
pub(super) fn evaluate_aggs(
df: &DataFrame,
aggs: &[Arc<dyn PhysicalExpr>],
groups: &GroupPositions,
state: &ExecutionState,
) -> PolarsResult<Vec<Column>> {
RAYON.install(|| {
aggs.par_iter()
.map(|expr| {
let agg = expr.evaluate_on_groups(df, groups, state)?.finalize();
polars_ensure!(agg.len() == groups.len(), agg_len = agg.len(), groups.len());
Ok(agg)
})
.collect::<PolarsResult<Vec<_>>>()
})
}
pub struct GroupByExec {
input: Box<dyn Executor>,
keys: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Arc<dyn PhysicalExpr>>,
apply: Option<PlanCallback<DataFrame, DataFrame>>,
maintain_order: bool,
input_schema: SchemaRef,
output_schema: SchemaRef,
slice: Option<(i64, usize)>,
}
impl GroupByExec {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
input: Box<dyn Executor>,
keys: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Arc<dyn PhysicalExpr>>,
apply: Option<PlanCallback<DataFrame, DataFrame>>,
maintain_order: bool,
input_schema: SchemaRef,
output_schema: SchemaRef,
slice: Option<(i64, usize)>,
) -> Self {
Self {
input,
keys,
aggs,
apply,
maintain_order,
input_schema,
output_schema,
slice,
}
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn group_by_helper(
mut df: DataFrame,
keys: Vec<Column>,
aggs: &[Arc<dyn PhysicalExpr>],
apply: Option<PlanCallback<DataFrame, DataFrame>>,
state: &ExecutionState,
maintain_order: bool,
output_schema: &SchemaRef,
slice: Option<(i64, usize)>,
) -> PolarsResult<DataFrame> {
df.rechunk_mut_par();
let gb = df.group_by_with_series(keys, true, maintain_order)?;
if let Some(f) = apply {
return gb.apply_sliced(slice, move |df| f.call(df), Some(output_schema));
}
let mut groups = gb.get_groups();
#[allow(unused_assignments)]
let mut sliced_groups = None;
if let Some((offset, len)) = slice {
sliced_groups = Some(groups.slice(offset, len));
groups = sliced_groups.as_ref().unwrap();
}
let (mut columns, agg_columns) = RAYON.install(|| {
let get_columns = || gb.keys_sliced(slice);
let get_agg = || evaluate_aggs(&df, aggs, groups, state);
rayon::join(get_columns, get_agg)
});
columns.extend(agg_columns?);
DataFrame::new_infer_height(columns)
}
impl GroupByExec {
fn execute_impl(&mut self, state: &ExecutionState, df: DataFrame) -> PolarsResult<DataFrame> {
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&df, state))
.collect::<PolarsResult<_>>()?;
group_by_helper(
df,
keys,
&self.aggs,
self.apply.take(),
state,
self.maintain_order,
&self.output_schema,
self.slice,
)
}
}
impl Executor for GroupByExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
state.should_stop()?;
#[cfg(debug_assertions)]
{
if state.verbose() {
eprintln!("run GroupbyExec")
}
}
if state.verbose() {
eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")
}
let df = self.input.execute(state)?;
let profile_name = if state.has_node_timer() {
let by = self
.keys
.iter()
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("group_by".to_string(), &by);
Cow::Owned(name)
} else {
Cow::Borrowed("")
};
if state.has_node_timer() {
let new_state = state.clone();
new_state.record(|| self.execute_impl(state, df), profile_name)
} else {
self.execute_impl(state, df)
}
}
}