polars_plan/plans/optimizer/
mod.rs

1use polars_core::prelude::*;
2
3use crate::prelude::*;
4
5mod cache_states;
6mod delay_rechunk;
7
8mod cluster_with_columns;
9mod collapse_and_project;
10mod collapse_joins;
11mod collect_members;
12mod count_star;
13#[cfg(feature = "cse")]
14mod cse;
15mod flatten_union;
16#[cfg(feature = "fused")]
17mod fused;
18mod join_utils;
19pub(crate) use join_utils::ExprOrigin;
20mod expand_datasets;
21mod predicate_pushdown;
22mod projection_pushdown;
23mod set_order;
24mod simplify_expr;
25mod slice_pushdown_expr;
26mod slice_pushdown_lp;
27mod stack_opt;
28
29use collapse_and_project::SimpleProjectionAndCollapse;
30#[cfg(feature = "cse")]
31pub use cse::NaiveExprMerger;
32use delay_rechunk::DelayRechunk;
33pub use expand_datasets::ExpandedDataset;
34use polars_core::config::verbose;
35use polars_io::predicates::PhysicalIoExpr;
36pub use predicate_pushdown::PredicatePushDown;
37pub use projection_pushdown::ProjectionPushDown;
38pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
39use slice_pushdown_lp::SlicePushDown;
40pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};
41
42use self::flatten_union::FlattenUnionRule;
43use self::set_order::set_order_flags;
44pub use crate::frame::{AllowedOptimizations, OptFlags};
45pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
46use crate::plans::optimizer::count_star::CountStar;
47#[cfg(feature = "cse")]
48use crate::plans::optimizer::cse::CommonSubExprOptimizer;
49#[cfg(feature = "cse")]
50use crate::plans::optimizer::cse::prune_unused_caches;
51use crate::plans::optimizer::predicate_pushdown::ExprEval;
52#[cfg(feature = "cse")]
53use crate::plans::visitor::*;
54use crate::prelude::optimizer::collect_members::MemberCollector;
55
56pub trait Optimize {
57    fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
58}
59
60// arbitrary constant to reduce reallocation.
61const HASHMAP_SIZE: usize = 16;
62
63pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
64    PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
65}
66
67pub(crate) fn pushdown_maintain_errors() -> bool {
68    std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")
69}
70
71pub fn optimize(
72    logical_plan: DslPlan,
73    mut opt_flags: OptFlags,
74    lp_arena: &mut Arena<IR>,
75    expr_arena: &mut Arena<AExpr>,
76    scratch: &mut Vec<Node>,
77    expr_eval: ExprEval<'_>,
78) -> PolarsResult<Node> {
79    #[allow(dead_code)]
80    let verbose = verbose();
81
82    // Gradually fill the rules passed to the optimizer
83    let opt = StackOptimizer {};
84    let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);
85
86    // Unset CSE
87    // This can be turned on again during ir-conversion.
88    #[allow(clippy::eq_op)]
89    #[cfg(feature = "cse")]
90    if opt_flags.contains(OptFlags::EAGER) {
91        opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
92    }
93    let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_flags)?;
94
95    // Don't run optimizations that don't make sense on a single node.
96    // This keeps eager execution more snappy.
97    #[cfg(feature = "cse")]
98    let comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);
99
100    #[cfg(feature = "cse")]
101    let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);
102    #[cfg(not(feature = "cse"))]
103    let comm_subexpr_elim = false;
104
105    // Note: This can be in opt_flags in the future if needed.
106    let pushdown_maintain_errors = pushdown_maintain_errors();
107
108    // During debug we check if the optimizations have not modified the final schema.
109    #[cfg(debug_assertions)]
110    let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();
111
112    let mut _opt_members = &mut None;
113
114    macro_rules! get_or_init_members {
115        () => {
116            _get_or_init_members(_opt_members, lp_top, lp_arena, expr_arena)
117        };
118    }
119
120    macro_rules! get_members_opt {
121        () => {
122            _opt_members.as_mut()
123        };
124    }
125
126    // Run before slice pushdown
127    if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {
128        let members = get_or_init_members!();
129        if members.has_group_by | members.has_sort | members.has_distinct {
130            set_order_flags(lp_top, lp_arena, expr_arena, scratch);
131        }
132    }
133
134    if opt_flags.simplify_expr() {
135        #[cfg(feature = "fused")]
136        rules.push(Box::new(fused::FusedArithmetic {}));
137    }
138
139    #[cfg(feature = "cse")]
140    let _cse_plan_changed = if comm_subplan_elim {
141        let members = get_or_init_members!();
142        if (members.has_sink_multiple || members.has_joins_or_unions)
143            && members.has_duplicate_scans()
144            && !members.has_cache
145        {
146            if verbose {
147                eprintln!("found multiple sources; run comm_subplan_elim")
148            }
149
150            let (lp, changed, cid2c) = cse::elim_cmn_subplans(lp_top, lp_arena, expr_arena);
151
152            prune_unused_caches(lp_arena, cid2c);
153
154            lp_top = lp;
155            members.has_cache |= changed;
156            changed
157        } else {
158            false
159        }
160    } else {
161        false
162    };
163    #[cfg(not(feature = "cse"))]
164    let _cse_plan_changed = false;
165
166    // Should be run before predicate pushdown.
167    if opt_flags.projection_pushdown() {
168        let mut projection_pushdown_opt = ProjectionPushDown::new();
169        let alp = lp_arena.take(lp_top);
170        let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
171        lp_arena.replace(lp_top, alp);
172
173        if projection_pushdown_opt.is_count_star {
174            let mut count_star_opt = CountStar::new();
175            count_star_opt.optimize_plan(lp_arena, expr_arena, lp_top)?;
176        }
177    }
178
179    if opt_flags.predicate_pushdown() {
180        let mut predicate_pushdown_opt = PredicatePushDown::new(
181            expr_eval,
182            pushdown_maintain_errors,
183            opt_flags.new_streaming(),
184        );
185        let alp = lp_arena.take(lp_top);
186        let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
187        lp_arena.replace(lp_top, alp);
188    }
189
190    // Make sure it is after predicate pushdown
191    if opt_flags.collapse_joins() && get_or_init_members!().has_filter_with_join_input {
192        collapse_joins::optimize(lp_top, lp_arena, expr_arena, opt_flags.new_streaming());
193    }
194
195    // Make sure its before slice pushdown.
196    if opt_flags.fast_projection() {
197        rules.push(Box::new(SimpleProjectionAndCollapse::new(
198            opt_flags.eager(),
199        )));
200    }
201
202    if !opt_flags.eager() {
203        rules.push(Box::new(DelayRechunk::new()));
204    }
205
206    if opt_flags.slice_pushdown() {
207        let mut slice_pushdown_opt = SlicePushDown::new(
208            // We don't maintain errors on slice as the behavior is much more predictable that way.
209            //
210            // Even if we enable maintain_errors (thereby preventing the slice from being pushed),
211            // the new-streaming engine still may not error due to early-stopping.
212            false, // maintain_errors
213            opt_flags.new_streaming(),
214        );
215        let alp = lp_arena.take(lp_top);
216        let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
217
218        lp_arena.replace(lp_top, alp);
219
220        // Expressions use the stack optimizer.
221        rules.push(Box::new(slice_pushdown_opt));
222    }
223
224    // This optimization removes branches, so we must do it when type coercion
225    // is completed.
226    if opt_flags.simplify_expr() {
227        rules.push(Box::new(SimplifyBooleanRule {}));
228    }
229
230    if !opt_flags.eager() {
231        rules.push(Box::new(FlattenUnionRule {}));
232    }
233
234    // Note: ExpandDatasets must run after slice and predicate pushdown.
235    rules.push(Box::new(expand_datasets::ExpandDatasets {}) as Box<dyn OptimizationRule>);
236
237    lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top)?;
238
239    if opt_flags.cluster_with_columns() {
240        cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)
241    }
242
243    if _cse_plan_changed
244        && get_members_opt!()
245            .is_some_and(|members| members.has_joins_or_unions && members.has_cache)
246    {
247        // We only want to run this on cse inserted caches
248        cache_states::set_cache_states(
249            lp_top,
250            lp_arena,
251            expr_arena,
252            scratch,
253            expr_eval,
254            verbose,
255            pushdown_maintain_errors,
256            opt_flags.new_streaming(),
257        )?;
258    }
259
260    // This one should run (nearly) last as this modifies the projections
261    #[cfg(feature = "cse")]
262    if comm_subexpr_elim && !get_or_init_members!().has_ext_context {
263        let mut optimizer = CommonSubExprOptimizer::new();
264        let alp_node = IRNode::new_mutate(lp_top);
265
266        lp_top = try_with_ir_arena(lp_arena, expr_arena, |arena| {
267            let rewritten = alp_node.rewrite(&mut optimizer, arena)?;
268            Ok(rewritten.node())
269        })?;
270    }
271
272    // During debug we check if the optimizations have not modified the final schema.
273    #[cfg(debug_assertions)]
274    {
275        // only check by names because we may supercast types.
276        assert_eq!(
277            prev_schema.iter_names().collect::<Vec<_>>(),
278            lp_arena
279                .get(lp_top)
280                .schema(lp_arena)
281                .iter_names()
282                .collect::<Vec<_>>()
283        );
284    };
285
286    Ok(lp_top)
287}
288
289fn _get_or_init_members<'a>(
290    opt_members: &'a mut Option<MemberCollector>,
291    lp_top: Node,
292    lp_arena: &mut Arena<IR>,
293    expr_arena: &mut Arena<AExpr>,
294) -> &'a mut MemberCollector {
295    opt_members.get_or_insert_with(|| {
296        let mut members = MemberCollector::new();
297        members.collect(lp_top, lp_arena, expr_arena);
298
299        members
300    })
301}