mod cache;
mod drop_duplicates;
mod executor;
mod explode;
mod ext_context;
mod filter;
mod groupby;
mod groupby_dynamic;
mod groupby_partitioned;
mod groupby_rolling;
mod join;
mod melt;
mod projection;
#[cfg(feature = "python")]
mod python_scan;
mod scan;
mod slice;
mod sort;
mod stack;
mod udf;
mod union;
use std::borrow::Cow;
use std::path::PathBuf;
pub use executor::*;
use polars_core::POOL;
use polars_plan::global::FETCH_ROWS;
use polars_plan::utils::*;
use rayon::prelude::*;
pub(super) use self::cache::*;
pub(super) use self::drop_duplicates::*;
pub(super) use self::explode::*;
pub(super) use self::ext_context::*;
pub(super) use self::filter::*;
pub(super) use self::groupby::*;
#[cfg(feature = "dynamic_groupby")]
pub(super) use self::groupby_dynamic::*;
pub(super) use self::groupby_partitioned::*;
#[cfg(feature = "dynamic_groupby")]
pub(super) use self::groupby_rolling::*;
pub(super) use self::join::*;
pub(super) use self::melt::*;
pub(super) use self::projection::*;
#[cfg(feature = "python")]
pub(super) use self::python_scan::*;
pub(super) use self::scan::*;
pub(super) use self::slice::*;
pub(super) use self::sort::*;
pub(super) use self::stack::*;
pub(super) use self::udf::*;
pub(super) use self::union::*;
use super::*;
use crate::physical_plan::state::StateFlags;
fn execute_projection_cached_window_fns(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Series>> {
#[allow(clippy::type_complexity)]
let mut windows: Vec<(String, Vec<(u32, bool, Arc<dyn PhysicalExpr>)>)> = vec![];
let mut other = Vec::with_capacity(exprs.len());
let mut index = 0u32;
exprs.iter().for_each(|phys| {
index += 1;
let e = phys.as_expression().unwrap();
let mut is_window = false;
for e in e.into_iter() {
if let Expr::Window {
partition_by,
options,
..
} = e
{
let groupby = format!("{:?}", partition_by.as_slice());
if let Some(tpl) = windows.iter_mut().find(|tpl| tpl.0 == groupby) {
tpl.1.push((index, options.explode, phys.clone()))
} else {
windows.push((groupby, vec![(index, options.explode, phys.clone())]))
}
is_window = true;
break;
}
}
if !is_window {
other.push((index, phys))
}
});
let mut selected_columns = POOL.install(|| {
other
.par_iter()
.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))
.collect::<PolarsResult<Vec<_>>>()
})?;
for mut partition in windows {
let mut state = state.split();
if partition.1.len() == 1 {
state.flags.remove(StateFlags::CACHE_WINDOW_EXPR)
} else {
state.flags.insert(StateFlags::CACHE_WINDOW_EXPR);
}
partition.1.sort_unstable_by_key(|(_idx, explode, _)| {
!explode
});
for (index, _, e) in partition.1 {
if e.as_expression()
.unwrap()
.into_iter()
.filter(|e| matches!(e, Expr::Window { .. }))
.count()
== 1
{
state.flags.insert(StateFlags::CACHE_WINDOW_EXPR)
}
else {
state.flags.remove(StateFlags::CACHE_WINDOW_EXPR)
}
let s = e.evaluate(df, &state)?;
selected_columns.push((index, s));
}
}
selected_columns.sort_unstable_by_key(|tpl| tpl.0);
let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();
Ok(selected_columns)
}
pub(crate) fn evaluate_physical_expressions(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &mut ExecutionState,
has_windows: bool,
) -> PolarsResult<DataFrame> {
let zero_length = df.height() == 0;
let selected_columns = if has_windows {
execute_projection_cached_window_fns(df, exprs, state)?
} else {
POOL.install(|| {
exprs
.par_iter()
.map(|expr| expr.evaluate(df, state))
.collect::<PolarsResult<_>>()
})?
};
check_expand_literals(selected_columns, zero_length)
}
fn check_expand_literals(
mut selected_columns: Vec<Series>,
zero_length: bool,
) -> PolarsResult<DataFrame> {
let first_len = selected_columns[0].len();
let mut df_height = 0;
let mut all_equal_len = true;
{
let mut names = PlHashSet::with_capacity(selected_columns.len());
for s in &selected_columns {
let len = s.len();
df_height = std::cmp::max(df_height, len);
if len != first_len {
all_equal_len = false;
}
let name = s.name();
if !names.insert(name) {
return Err(PolarsError::Duplicate(
format!("Column with name: '{name}' has more than one occurrences").into(),
));
}
}
}
if !all_equal_len {
selected_columns = selected_columns
.into_iter()
.map(|series| {
if series.len() == 1 && df_height > 1 {
Ok(series.new_from_index(0, df_height))
} else if series.len() == df_height || series.len() == 0 {
Ok(series)
} else {
Err(PolarsError::ComputeError(
format!(
"Series {series:?} does not match the DataFrame height of {df_height}",
)
.into(),
))
}
})
.collect::<PolarsResult<_>>()?
}
let df = DataFrame::new_no_checks(selected_columns);
let df = if zero_length {
let min = df.get_columns().iter().map(|s| s.len()).min();
if min.is_some() {
df.head(min)
} else {
df
}
} else {
df
};
Ok(df)
}