use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use super::*;
pub struct ProjectionExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) has_windows: bool,
pub(crate) input_schema: SchemaRef,
#[cfg(test)]
pub(crate) schema: SchemaRef,
pub(crate) options: ProjectionOptions,
pub(crate) allow_vertical_parallelism: bool,
}
impl ProjectionExec {
fn execute_impl(
&mut self,
state: &ExecutionState,
mut df: DataFrame,
) -> PolarsResult<DataFrame> {
let df = if self.allow_vertical_parallelism
&& df.first_col_n_chunks() > 1
&& df.height() > RAYON.current_num_threads() * 2
&& self.options.run_parallel
{
let chunks = df.split_chunks().collect::<Vec<_>>();
let iter = chunks.into_par_iter().map(|mut df| {
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.expr,
state,
self.has_windows,
self.options.run_parallel,
)?;
check_expand_literals(
&df,
&self.expr,
selected_cols,
df.shape_has_zero(),
self.options,
)
});
let df = RAYON.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
accumulate_dataframes_vertical_unchecked(df)
}
else {
#[allow(clippy::let_and_return)]
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.expr,
state,
self.has_windows,
self.options.run_parallel,
)?;
check_expand_literals(
&df,
&self.expr,
selected_cols,
df.shape_has_zero(),
self.options,
)?
};
#[cfg(test)]
#[allow(unused_must_use)]
{
for (l, r) in df.columns().iter().zip(self.schema.iter_names()) {
assert_eq!(l.name(), r);
}
}
Ok(df)
}
}
impl Executor for ProjectionExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
state.should_stop()?;
#[cfg(debug_assertions)]
{
if state.verbose() {
eprintln!("run ProjectionExec");
}
}
let df = self.input.execute(state)?;
let profile_name = if state.has_node_timer() {
let by = self
.expr
.iter()
.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("select".to_string(), &by);
Cow::Owned(name)
} else {
Cow::Borrowed("")
};
if state.has_node_timer() {
let new_state = state.clone();
new_state.record(|| self.execute_impl(state, df), profile_name)
} else {
self.execute_impl(state, df)
}
}
}