use polars_core::prelude::*;
use polars_error::feature_gated;
use crate::plans::optimizer::parquet_metadata_prune::prune_parquet_metadata;
use crate::plans::optimizer::projection_pushdown::projection_pushdown;
use crate::prelude::*;
mod delay_rechunk;
mod cluster_with_columns;
mod collapse_and_project;
mod collect_members;
#[cfg(feature = "cse")]
mod cse;
#[cfg(feature = "merge_sorted")]
mod flatten_merge_sorted;
mod flatten_union;
#[cfg(feature = "fused")]
mod fused;
mod join_utils;
pub(crate) use join_utils::ExprOrigin;
mod expand_datasets;
#[cfg(feature = "python")]
pub use expand_datasets::ExpandedPythonScan;
mod collapse_sort;
pub mod deep_copy;
mod ir_traversal;
mod parquet_metadata_prune;
mod predicate_pushdown;
mod projection_pushdown;
mod simplify_expr;
pub mod simplify_ordering;
mod slice_pushdown_expr;
mod slice_pushdown_lp;
mod sortedness;
mod stack_opt;
use collapse_and_project::SimpleProjectionAndCollapse;
#[cfg(feature = "cse")]
pub use cse::NaiveExprMerger;
use delay_rechunk::DelayRechunk;
pub use expand_datasets::ExpandedDataset;
use polars_core::config::verbose;
pub use predicate_pushdown::{
DynamicPred, DynamicPredWeakRef, PredicateExpr, PredicatePushDown, TrivialPredicateExpr,
};
pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
use slice_pushdown_lp::SlicePushDown;
pub use sortedness::{
AExprSorted, IRPlanSorted, IRSorted, are_keys_sorted_any, expr_is_sorted, is_sorted,
};
pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};
#[cfg(feature = "merge_sorted")]
use self::flatten_merge_sorted::FlattenMergeSortedRule;
use self::flatten_union::FlattenUnionRule;
pub use crate::frame::{AllowedOptimizations, OptFlags};
pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
#[cfg(feature = "cse")]
use crate::plans::optimizer::cse::CommonSubExprOptimizer;
#[cfg(feature = "cse")]
use crate::plans::visitor::*;
use crate::prelude::optimizer::collect_members::MemberCollector;
pub trait Optimize {
fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
}
const HASHMAP_SIZE: usize = 16;
pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
}
pub(crate) fn pushdown_maintain_errors() -> bool {
std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")
}
pub fn optimize(
logical_plan: DslPlan,
mut opt_flags: OptFlags,
ir_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
apply_scan_predicate_to_scan_ir: fn(
Node,
&mut Arena<IR>,
&mut Arena<AExpr>,
) -> PolarsResult<()>,
) -> PolarsResult<Node> {
#[allow(dead_code)]
let verbose = verbose();
let opt = StackOptimizer {};
let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);
#[allow(clippy::eq_op)]
#[cfg(feature = "cse")]
if opt_flags.contains(OptFlags::EAGER) {
opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
}
let mut root = to_alp(logical_plan, expr_arena, ir_arena, &mut opt_flags)?;
#[allow(unused_assignments)]
let mut comm_subplan_elim = false;
#[cfg(feature = "cse")]
{
comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);
}
#[cfg(feature = "cse")]
let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);
#[cfg(not(feature = "cse"))]
let comm_subexpr_elim = false;
let pushdown_maintain_errors = pushdown_maintain_errors();
#[cfg(debug_assertions)]
let prev_schema = ir_arena.get(root).schema(ir_arena).into_owned();
let mut _opt_members: &mut Option<MemberCollector> = &mut None;
macro_rules! get_or_init_members {
() => {
_get_or_init_members(_opt_members, root, ir_arena, expr_arena)
};
}
if opt_flags.simplify_expr() {
#[cfg(feature = "fused")]
rules.push(Box::new(fused::FusedArithmetic {}));
}
#[cfg(feature = "cse")]
let mut run_set_cache_states = false;
if comm_subplan_elim {
feature_gated!("cse", {
let members = get_or_init_members!();
if (members.has_sink_multiple || members.has_joins_or_unions)
&& members.has_duplicate_scans()
&& !members.has_cache
{
if verbose {
eprintln!("found multiple sources; run comm_subplan_elim")
}
run_set_cache_states = cse::cspe::common_subplan_elimination(
root,
ir_arena,
expr_arena,
polars_config::config().allow_nested_cspe(),
);
}
});
};
let mut repeat_slice_pd_after_filter_pd = false;
if opt_flags.slice_pushdown() {
let mut slice_pushdown_opt = SlicePushDown::new();
let ir = slice_pushdown_opt.optimize(root, ir_arena, expr_arena)?;
ir_arena.replace(root, ir);
repeat_slice_pd_after_filter_pd = slice_pushdown_opt.slice_node_in_optimized_plan;
}
if opt_flags.predicate_pushdown() {
let mut predicate_pushdown_opt =
PredicatePushDown::new(pushdown_maintain_errors, opt_flags.streaming());
let ir = ir_arena.take(root);
let ir = predicate_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;
ir_arena.replace(root, ir);
}
#[cfg(feature = "cse")]
if run_set_cache_states {
cse::set_cache_states(
root,
ir_arena,
expr_arena,
scratch,
verbose,
pushdown_maintain_errors,
opt_flags.streaming(),
)?;
}
if opt_flags.contains(OptFlags::SORT_COLLAPSE) {
root = opt.optimize_loop(
&mut [Box::new(collapse_sort::CollapseSort {}) as _],
expr_arena,
ir_arena,
root,
)?;
}
if opt_flags.projection_pushdown() {
projection_pushdown(root, ir_arena, expr_arena);
}
if opt_flags.fast_projection() {
rules.push(Box::new(SimpleProjectionAndCollapse::new(
opt_flags.eager(),
)));
}
if !opt_flags.eager() {
rules.push(Box::new(DelayRechunk::new()));
}
if opt_flags.simplify_expr() {
rules.push(Box::new(SimplifyBooleanRule {}));
}
if !opt_flags.eager() {
#[cfg(feature = "merge_sorted")]
rules.push(Box::new(FlattenMergeSortedRule::new()));
rules.push(Box::new(FlattenUnionRule {}));
}
root = opt.optimize_loop(&mut rules, expr_arena, ir_arena, root)?;
if repeat_slice_pd_after_filter_pd {
let mut slice_pushdown_opt = SlicePushDown::new();
let ir = slice_pushdown_opt.optimize(root, ir_arena, expr_arena)?;
ir_arena.replace(root, ir);
}
if opt_flags.cluster_with_columns() && get_or_init_members!().with_columns_count > 1 {
cluster_with_columns::optimize(root, ir_arena, expr_arena)
}
#[cfg(feature = "cse")]
if comm_subexpr_elim && !get_or_init_members!().has_ext_context {
let mut optimizer = CommonSubExprOptimizer::new(
opt_flags.contains(OptFlags::STREAMING) | opt_flags.contains(OptFlags::GPU),
);
let ir_node = IRNode::new_mutate(root);
root = try_with_ir_arena(ir_arena, expr_arena, |arena| {
let rewritten = ir_node.rewrite(&mut optimizer, arena)?;
Ok(rewritten.node())
})?;
}
if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {
match ir_arena.get(root) {
IR::SinkMultiple { inputs } => {
let mut roots = inputs.clone();
for root in &mut roots {
if !matches!(ir_arena.get(*root), IR::Sink { .. }) {
*root = ir_arena.add(IR::Sink {
input: *root,
payload: SinkTypeIR::Memory,
});
}
}
simplify_ordering::simplify_and_fetch_orderings(&roots, ir_arena, expr_arena);
},
ir => {
let mut tmp_top = root;
if !matches!(ir, IR::Sink { .. }) {
tmp_top = ir_arena.add(IR::Sink {
input: root,
payload: SinkTypeIR::Memory,
});
}
simplify_ordering::simplify_and_fetch_orderings(&[tmp_top], ir_arena, expr_arena);
},
}
}
expand_datasets::expand_datasets(root, ir_arena, expr_arena, apply_scan_predicate_to_scan_ir)?;
prune_parquet_metadata(root, ir_arena, expr_arena);
#[cfg(debug_assertions)]
{
let prev_names = prev_schema.iter_names().collect::<Vec<_>>();
let new_schema = ir_arena.get(root).schema(ir_arena);
let optimized_names = new_schema.iter_names().collect::<Vec<_>>();
if optimized_names != prev_names {
panic!(
"{optimized_names:?} != {prev_names:?}; plan: {}",
IRPlanRef {
lp_top: root,
lp_arena: ir_arena,
expr_arena,
}
.display()
);
}
};
Ok(root)
}
fn _get_or_init_members<'a>(
opt_members: &'a mut Option<MemberCollector>,
root: Node,
ir_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> &'a mut MemberCollector {
opt_members.get_or_insert_with(|| {
let mut members = MemberCollector::new();
members.collect(root, ir_arena, expr_arena);
members
})
}