polars-plan 0.54.3

Lazy query engine for the Polars DataFrame library
use polars_core::prelude::*;
use polars_error::feature_gated;

use crate::plans::optimizer::parquet_metadata_prune::prune_parquet_metadata;
use crate::plans::optimizer::projection_pushdown::projection_pushdown;
use crate::prelude::*;

mod delay_rechunk;

mod cluster_with_columns;
mod collapse_and_project;
mod collect_members;
#[cfg(feature = "cse")]
mod cse;
#[cfg(feature = "merge_sorted")]
mod flatten_merge_sorted;
mod flatten_union;
#[cfg(feature = "fused")]
mod fused;
mod join_utils;
pub(crate) use join_utils::ExprOrigin;
mod expand_datasets;
#[cfg(feature = "python")]
pub use expand_datasets::ExpandedPythonScan;
mod collapse_sort;
pub mod deep_copy;
mod ir_traversal;
mod parquet_metadata_prune;
mod predicate_pushdown;
mod projection_pushdown;
mod simplify_expr;
pub mod simplify_ordering;
mod slice_pushdown_expr;
mod slice_pushdown_lp;
mod sortedness;
mod stack_opt;

use collapse_and_project::SimpleProjectionAndCollapse;
#[cfg(feature = "cse")]
pub use cse::NaiveExprMerger;
use delay_rechunk::DelayRechunk;
pub use expand_datasets::ExpandedDataset;
use polars_core::config::verbose;
pub use predicate_pushdown::{
    DynamicPred, DynamicPredWeakRef, PredicateExpr, PredicatePushDown, TrivialPredicateExpr,
};
pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
use slice_pushdown_lp::SlicePushDown;
pub use sortedness::{
    AExprSorted, IRPlanSorted, IRSorted, are_keys_sorted_any, expr_is_sorted, is_sorted,
};
pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};

#[cfg(feature = "merge_sorted")]
use self::flatten_merge_sorted::FlattenMergeSortedRule;
use self::flatten_union::FlattenUnionRule;
pub use crate::frame::{AllowedOptimizations, OptFlags};
pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
#[cfg(feature = "cse")]
use crate::plans::optimizer::cse::CommonSubExprOptimizer;
#[cfg(feature = "cse")]
use crate::plans::visitor::*;
use crate::prelude::optimizer::collect_members::MemberCollector;

pub trait Optimize {
    fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
}

// arbitrary constant to reduce reallocation.
const HASHMAP_SIZE: usize = 16;

pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
    PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
}

pub(crate) fn pushdown_maintain_errors() -> bool {
    std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")
}

pub fn optimize(
    logical_plan: DslPlan,
    mut opt_flags: OptFlags,
    ir_arena: &mut Arena<IR>,
    expr_arena: &mut Arena<AExpr>,
    scratch: &mut Vec<Node>,
    apply_scan_predicate_to_scan_ir: fn(
        Node,
        &mut Arena<IR>,
        &mut Arena<AExpr>,
    ) -> PolarsResult<()>,
) -> PolarsResult<Node> {
    #[allow(dead_code)]
    let verbose = verbose();

    // Gradually fill the rules passed to the optimizer
    let opt = StackOptimizer {};
    let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);

    // Unset CSE
    // This can be turned on again during ir-conversion.
    #[allow(clippy::eq_op)]
    #[cfg(feature = "cse")]
    if opt_flags.contains(OptFlags::EAGER) {
        opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
    }
    let mut root = to_alp(logical_plan, expr_arena, ir_arena, &mut opt_flags)?;

    #[allow(unused_assignments)]
    let mut comm_subplan_elim = false;
    // Don't run optimizations that don't make sense on a single node.
    // This keeps eager execution more snappy.
    #[cfg(feature = "cse")]
    {
        comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);
    }

    #[cfg(feature = "cse")]
    let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);
    #[cfg(not(feature = "cse"))]
    let comm_subexpr_elim = false;

    // Note: This can be in opt_flags in the future if needed.
    let pushdown_maintain_errors = pushdown_maintain_errors();

    // During debug we check if the optimizations have not modified the final schema.
    #[cfg(debug_assertions)]
    let prev_schema = ir_arena.get(root).schema(ir_arena).into_owned();

    let mut _opt_members: &mut Option<MemberCollector> = &mut None;

    macro_rules! get_or_init_members {
        () => {
            _get_or_init_members(_opt_members, root, ir_arena, expr_arena)
        };
    }

    // Run before slice pushdown
    if opt_flags.simplify_expr() {
        #[cfg(feature = "fused")]
        rules.push(Box::new(fused::FusedArithmetic {}));
    }

    #[cfg(feature = "cse")]
    let mut run_set_cache_states = false;

    if comm_subplan_elim {
        feature_gated!("cse", {
            let members = get_or_init_members!();
            if (members.has_sink_multiple || members.has_joins_or_unions)
                && members.has_duplicate_scans()
                && !members.has_cache
            {
                if verbose {
                    eprintln!("found multiple sources; run comm_subplan_elim")
                }

                run_set_cache_states = cse::cspe::common_subplan_elimination(
                    root,
                    ir_arena,
                    expr_arena,
                    polars_config::config().allow_nested_cspe(),
                );
            }
        });
    };

    let mut repeat_slice_pd_after_filter_pd = false;

    if opt_flags.slice_pushdown() {
        let mut slice_pushdown_opt = SlicePushDown::new();
        let ir = slice_pushdown_opt.optimize(root, ir_arena, expr_arena)?;

        ir_arena.replace(root, ir);

        repeat_slice_pd_after_filter_pd = slice_pushdown_opt.slice_node_in_optimized_plan;
    }

    // Should be run before projection pushdown.
    // This allows columns only needed for filters to be dropped early.
    if opt_flags.predicate_pushdown() {
        let mut predicate_pushdown_opt =
            PredicatePushDown::new(pushdown_maintain_errors, opt_flags.streaming());
        let ir = ir_arena.take(root);
        let ir = predicate_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;
        ir_arena.replace(root, ir);
    }

    #[cfg(feature = "cse")]
    if run_set_cache_states {
        cse::set_cache_states(
            root,
            ir_arena,
            expr_arena,
            scratch,
            verbose,
            pushdown_maintain_errors,
            opt_flags.streaming(),
        )?;
    }

    // Must run before projection pushdown, as that can insert simple-projections between the sorts
    // on unused columns.
    if opt_flags.contains(OptFlags::SORT_COLLAPSE) {
        root = opt.optimize_loop(
            &mut [Box::new(collapse_sort::CollapseSort {}) as _],
            expr_arena,
            ir_arena,
            root,
        )?;
    }

    if opt_flags.projection_pushdown() {
        projection_pushdown(root, ir_arena, expr_arena);
    }

    if opt_flags.fast_projection() {
        rules.push(Box::new(SimpleProjectionAndCollapse::new(
            opt_flags.eager(),
        )));
    }

    if !opt_flags.eager() {
        rules.push(Box::new(DelayRechunk::new()));
    }

    // This optimization removes branches, so we must do it when type coercion
    // is completed.
    if opt_flags.simplify_expr() {
        rules.push(Box::new(SimplifyBooleanRule {}));
    }

    if !opt_flags.eager() {
        #[cfg(feature = "merge_sorted")]
        rules.push(Box::new(FlattenMergeSortedRule::new()));
        rules.push(Box::new(FlattenUnionRule {}));
    }

    root = opt.optimize_loop(&mut rules, expr_arena, ir_arena, root)?;

    if repeat_slice_pd_after_filter_pd {
        let mut slice_pushdown_opt = SlicePushDown::new();
        let ir = slice_pushdown_opt.optimize(root, ir_arena, expr_arena)?;

        ir_arena.replace(root, ir);
    }

    if opt_flags.cluster_with_columns() && get_or_init_members!().with_columns_count > 1 {
        cluster_with_columns::optimize(root, ir_arena, expr_arena)
    }

    // This one should run (nearly) last as this modifies the projections
    #[cfg(feature = "cse")]
    if comm_subexpr_elim && !get_or_init_members!().has_ext_context {
        let mut optimizer = CommonSubExprOptimizer::new(
            opt_flags.contains(OptFlags::STREAMING) | opt_flags.contains(OptFlags::GPU),
        );
        let ir_node = IRNode::new_mutate(root);

        root = try_with_ir_arena(ir_arena, expr_arena, |arena| {
            let rewritten = ir_node.rewrite(&mut optimizer, arena)?;
            Ok(rewritten.node())
        })?;
    }

    if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {
        match ir_arena.get(root) {
            IR::SinkMultiple { inputs } => {
                let mut roots = inputs.clone();
                for root in &mut roots {
                    if !matches!(ir_arena.get(*root), IR::Sink { .. }) {
                        *root = ir_arena.add(IR::Sink {
                            input: *root,
                            payload: SinkTypeIR::Memory,
                        });
                    }
                }
                simplify_ordering::simplify_and_fetch_orderings(&roots, ir_arena, expr_arena);
            },
            ir => {
                let mut tmp_top = root;
                if !matches!(ir, IR::Sink { .. }) {
                    tmp_top = ir_arena.add(IR::Sink {
                        input: root,
                        payload: SinkTypeIR::Memory,
                    });
                }
                simplify_ordering::simplify_and_fetch_orderings(&[tmp_top], ir_arena, expr_arena);
            },
        }
    }

    expand_datasets::expand_datasets(root, ir_arena, expr_arena, apply_scan_predicate_to_scan_ir)?;

    prune_parquet_metadata(root, ir_arena, expr_arena);

    // During debug we check if the optimizations have not modified the final schema.
    #[cfg(debug_assertions)]
    {
        // only check by names because we may supercast types.
        let prev_names = prev_schema.iter_names().collect::<Vec<_>>();
        let new_schema = ir_arena.get(root).schema(ir_arena);
        let optimized_names = new_schema.iter_names().collect::<Vec<_>>();

        if optimized_names != prev_names {
            panic!(
                "{optimized_names:?} != {prev_names:?}; plan: {}",
                IRPlanRef {
                    lp_top: root,
                    lp_arena: ir_arena,
                    expr_arena,
                }
                .display()
            );
        }
    };

    Ok(root)
}

fn _get_or_init_members<'a>(
    opt_members: &'a mut Option<MemberCollector>,
    root: Node,
    ir_arena: &mut Arena<IR>,
    expr_arena: &mut Arena<AExpr>,
) -> &'a mut MemberCollector {
    opt_members.get_or_insert_with(|| {
        let mut members = MemberCollector::new();
        members.collect(root, ir_arena, expr_arena);

        members
    })
}