use polars_plan::constants::CSE_REPLACED;
use polars_utils::itertools::Itertools;
use super::*;
pub(super) fn profile_name(
s: &dyn PhysicalExpr,
input_schema: &Schema,
) -> PolarsResult<PlSmallStr> {
match s.to_field(input_schema) {
Err(e) => Err(e),
Ok(fld) => Ok(fld.name),
}
}
type IdAndExpression = (u32, Arc<dyn PhysicalExpr>);
#[cfg(feature = "dynamic_group_by")]
fn rolling_evaluate(
df: &DataFrame,
state: &ExecutionState,
rolling: PlHashMap<RollingGroupOptions, Vec<IdAndExpression>>,
) -> PolarsResult<Vec<Vec<(u32, Column)>>> {
RAYON.install(|| {
rolling
.par_iter()
.map(|(options, partition)| {
let state = state.split();
let (_time_key, groups) = df.rolling(None, options)?;
let groups_key = format!("{options:?}");
state.window_cache.insert_groups(groups_key, groups);
partition
.par_iter()
.map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s)))
.collect::<PolarsResult<Vec<_>>>()
})
.collect()
})
}
fn window_evaluate(
df: &DataFrame,
state: &ExecutionState,
window: PlHashMap<String, Vec<IdAndExpression>>,
) -> PolarsResult<Vec<Vec<(u32, Column)>>> {
if window.is_empty() {
return Ok(vec![]);
}
let n_threads = RAYON.current_num_threads();
let max_hor = window.values().map(|v| v.len()).max().unwrap_or(0);
let vert = window.len();
let (cache, par_vertical, par_horizontal) = if max_hor >= n_threads || max_hor >= vert {
(true, false, true)
} else {
(false, true, true)
};
let apply = |partition: &[(u32, Arc<dyn PhysicalExpr>)]| {
let mut state = state.split();
state.insert_has_window_function_flag();
let cache = cache
&& partition.len() > 1
&& partition.iter().all(|(_, e)| {
e.as_expression()
.unwrap()
.into_iter()
.filter(|e| {
#[cfg(feature = "dynamic_group_by")]
if matches!(e, Expr::Rolling { .. }) {
return true;
}
matches!(e, Expr::Over { .. })
})
.count()
== 1
});
let mut first_result = None;
if cache {
let first = &partition[0];
let c = first.1.evaluate(df, &state)?;
first_result = Some((first.0, c));
state.insert_cache_window_flag();
} else {
state.remove_cache_window_flag();
}
let apply =
|index: &u32, e: &Arc<dyn PhysicalExpr>| e.evaluate(df, &state).map(|c| (*index, c));
let slice = &partition[first_result.is_some() as usize..];
let mut results = if par_horizontal {
slice
.par_iter()
.map(|(index, e)| apply(index, e))
.collect::<PolarsResult<Vec<_>>>()?
} else {
slice
.iter()
.map(|(index, e)| apply(index, e))
.collect::<PolarsResult<Vec<_>>>()?
};
if let Some(item) = first_result {
results.push(item)
}
Ok(results)
};
if par_vertical {
RAYON.install(|| window.par_iter().map(|t| apply(t.1)).collect())
} else {
window.iter().map(|t| apply(t.1)).collect()
}
}
fn execute_projection_cached_window_fns(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Column>> {
#[allow(clippy::type_complexity)]
let mut windows: PlHashMap<String, Vec<IdAndExpression>> = PlHashMap::default();
#[cfg(feature = "dynamic_group_by")]
let mut rolling: PlHashMap<RollingGroupOptions, Vec<IdAndExpression>> = PlHashMap::default();
let mut other = Vec::with_capacity(exprs.len());
exprs.iter().enumerate_u32().for_each(|(index, phys)| {
let mut is_window = false;
if let Some(e) = phys.as_expression() {
for e in e.into_iter() {
match e {
#[cfg(feature = "dynamic_group_by")]
Expr::Rolling {
function: _,
index_column,
period,
offset,
closed_window,
} => {
if let Expr::Column(index_column) = index_column.as_ref() {
let options = RollingGroupOptions {
index_column: index_column.clone(),
period: *period,
offset: *offset,
closed_window: *closed_window,
};
let entry = rolling.entry(options).or_default();
entry.push((index, phys.clone()));
is_window = true;
break;
}
},
Expr::Over {
function: _,
partition_by,
order_by,
mapping,
} => {
let mapping: &str = mapping.into();
let mut key = format!("{:?}_{mapping}", partition_by.as_slice());
if let Some((e, k)) = order_by {
polars_expr::prelude::window_function_format_order_by(
&mut key,
e.as_ref(),
k,
)
}
let entry = windows.entry(key).or_insert_with(Vec::new);
entry.push((index, phys.clone()));
is_window = true;
break;
},
_ => {},
}
}
} else {
is_window = false;
}
if !is_window {
other.push((index, phys.as_ref()))
}
});
let mut selected_columns = RAYON.install(|| {
other
.par_iter()
.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))
.collect::<PolarsResult<Vec<_>>>()
})?;
#[cfg(feature = "dynamic_group_by")]
{
let (a, b) = RAYON.join(
|| rolling_evaluate(df, state, rolling),
|| window_evaluate(df, state, windows),
);
let partitions = a?;
for part in partitions {
selected_columns.extend_from_slice(&part)
}
let partitions = b?;
for part in partitions {
selected_columns.extend_from_slice(&part)
}
}
#[cfg(not(feature = "dynamic_group_by"))]
{
let partitions = window_evaluate(df, state, windows)?;
for part in partitions {
selected_columns.extend_from_slice(&part)
}
}
selected_columns.sort_unstable_by_key(|tpl| tpl.0);
let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();
Ok(selected_columns)
}
fn run_exprs_par(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Column>> {
RAYON.install(|| {
exprs
.par_iter()
.map(|expr| expr.evaluate(df, state))
.collect()
})
}
fn run_exprs_seq(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Column>> {
exprs.iter().map(|expr| expr.evaluate(df, state)).collect()
}
pub(super) fn evaluate_physical_expressions(
df: &mut DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
has_windows: bool,
run_parallel: bool,
) -> PolarsResult<Vec<Column>> {
let expr_runner = if has_windows {
execute_projection_cached_window_fns
} else if run_parallel && exprs.len() > 1 {
run_exprs_par
} else {
run_exprs_seq
};
let selected_columns = expr_runner(df, exprs, state)?;
if has_windows {
state.clear_window_expr_cache();
}
Ok(selected_columns)
}
pub(super) fn check_expand_literals(
df: &DataFrame,
phys_expr: &[Arc<dyn PhysicalExpr>],
mut selected_columns: Vec<Column>,
is_empty: bool,
options: ProjectionOptions,
) -> PolarsResult<DataFrame> {
let Some(first_len) = selected_columns.first().map(|s| s.len()) else {
return Ok(DataFrame::empty());
};
let duplicate_check = options.duplicate_check;
let should_broadcast = options.should_broadcast;
let verify_scalar = if !df.columns().is_empty() {
!df.columns()[df.width() - 1]
.name()
.starts_with(CSE_REPLACED)
} else {
true
};
let mut df_height = 0;
let mut has_empty = false;
let mut all_equal_len = true;
{
let mut names = PlHashSet::with_capacity(selected_columns.len());
for s in &selected_columns {
let len = s.len();
has_empty |= len == 0;
df_height = std::cmp::max(df_height, len);
if len != first_len {
all_equal_len = false;
}
let name = s.name();
if duplicate_check && !names.insert(name) {
let msg = format!(
"the name '{name}' is duplicate\n\n\
It's possible that multiple expressions are returning the same default column \
name. If this is the case, try renaming the columns with \
`.alias(\"new_name\")` to avoid duplicate column names."
);
return Err(PolarsError::Duplicate(msg.into()));
}
}
}
if !all_equal_len && should_broadcast {
selected_columns = selected_columns
.into_iter()
.zip(phys_expr)
.map(|(series, phys)| {
Ok(match series.len() {
0 if df_height == 1 => series,
1 => {
if !has_empty && df_height == 1 {
series
} else {
if has_empty {
polars_ensure!(df_height == 1,
ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",
series.len(), df_height
);
}
if verify_scalar && !phys.is_scalar() && std::env::var("POLARS_ALLOW_NON_SCALAR_EXP").as_deref() != Ok("1") {
let identifier = match phys.as_expression() {
Some(e) => format!("expression: {e}"),
None => "this Series".to_string(),
};
polars_bail!(ShapeMismatch: "Series {}, length {} doesn't match the DataFrame height of {}\n\n\
If you want {} to be broadcasted, ensure it is a scalar (for instance by adding '.first()').",
series.name(), series.len(), df_height *(!has_empty as usize), identifier
);
}
series.new_from_index(0, df_height * (!has_empty as usize) )
}
},
len if len == df_height => {
series
},
_ => {
polars_bail!(
ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",
series.len(), df_height
)
}
})
})
.collect::<PolarsResult<_>>()?
}
let selected_columns = selected_columns.into_iter().collect::<Vec<_>>();
let df = unsafe { DataFrame::new_unchecked_infer_height(selected_columns) };
let df = if is_empty {
let min = df.columns().iter().map(|s| s.len()).min();
if min.is_some() { df.head(min) } else { df }
} else {
df
};
Ok(df)
}