polars_plan/plans/optimizer/
mod.rs1use polars_core::prelude::*;
2
3use crate::prelude::*;
4
5mod cache_states;
6mod delay_rechunk;
7
8mod cluster_with_columns;
9mod collapse_and_project;
10mod collapse_joins;
11mod collect_members;
12mod count_star;
13#[cfg(feature = "cse")]
14mod cse;
15mod flatten_union;
16#[cfg(feature = "fused")]
17mod fused;
18mod join_utils;
19pub(crate) use join_utils::ExprOrigin;
20mod expand_datasets;
21mod predicate_pushdown;
22mod projection_pushdown;
23mod set_order;
24mod simplify_expr;
25mod slice_pushdown_expr;
26mod slice_pushdown_lp;
27mod stack_opt;
28
29use collapse_and_project::SimpleProjectionAndCollapse;
30#[cfg(feature = "cse")]
31pub use cse::NaiveExprMerger;
32use delay_rechunk::DelayRechunk;
33pub use expand_datasets::ExpandedDataset;
34use polars_core::config::verbose;
35use polars_io::predicates::PhysicalIoExpr;
36pub use predicate_pushdown::PredicatePushDown;
37pub use projection_pushdown::ProjectionPushDown;
38pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
39use slice_pushdown_lp::SlicePushDown;
40pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};
41
42use self::flatten_union::FlattenUnionRule;
43use self::set_order::set_order_flags;
44pub use crate::frame::{AllowedOptimizations, OptFlags};
45pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
46use crate::plans::optimizer::count_star::CountStar;
47#[cfg(feature = "cse")]
48use crate::plans::optimizer::cse::CommonSubExprOptimizer;
49#[cfg(feature = "cse")]
50use crate::plans::optimizer::cse::prune_unused_caches;
51use crate::plans::optimizer::predicate_pushdown::ExprEval;
52#[cfg(feature = "cse")]
53use crate::plans::visitor::*;
54use crate::prelude::optimizer::collect_members::MemberCollector;
55
56pub trait Optimize {
57 fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
58}
59
60const HASHMAP_SIZE: usize = 16;
62
63pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
64 PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
65}
66
67pub(crate) fn pushdown_maintain_errors() -> bool {
68 std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")
69}
70
71pub fn optimize(
72 logical_plan: DslPlan,
73 mut opt_flags: OptFlags,
74 lp_arena: &mut Arena<IR>,
75 expr_arena: &mut Arena<AExpr>,
76 scratch: &mut Vec<Node>,
77 expr_eval: ExprEval<'_>,
78) -> PolarsResult<Node> {
79 #[allow(dead_code)]
80 let verbose = verbose();
81
82 let opt = StackOptimizer {};
84 let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);
85
86 #[allow(clippy::eq_op)]
89 #[cfg(feature = "cse")]
90 if opt_flags.contains(OptFlags::EAGER) {
91 opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
92 }
93 let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_flags)?;
94
95 #[cfg(feature = "cse")]
98 let comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);
99
100 #[cfg(feature = "cse")]
101 let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);
102 #[cfg(not(feature = "cse"))]
103 let comm_subexpr_elim = false;
104
105 let pushdown_maintain_errors = pushdown_maintain_errors();
107
108 #[cfg(debug_assertions)]
110 let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();
111
112 let mut _opt_members = &mut None;
113
114 macro_rules! get_or_init_members {
115 () => {
116 _get_or_init_members(_opt_members, lp_top, lp_arena, expr_arena)
117 };
118 }
119
120 macro_rules! get_members_opt {
121 () => {
122 _opt_members.as_mut()
123 };
124 }
125
126 if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {
128 let members = get_or_init_members!();
129 if members.has_group_by | members.has_sort | members.has_distinct {
130 set_order_flags(lp_top, lp_arena, expr_arena, scratch);
131 }
132 }
133
134 if opt_flags.simplify_expr() {
135 #[cfg(feature = "fused")]
136 rules.push(Box::new(fused::FusedArithmetic {}));
137 }
138
139 #[cfg(feature = "cse")]
140 let _cse_plan_changed = if comm_subplan_elim {
141 let members = get_or_init_members!();
142 if (members.has_sink_multiple || members.has_joins_or_unions)
143 && members.has_duplicate_scans()
144 && !members.has_cache
145 {
146 if verbose {
147 eprintln!("found multiple sources; run comm_subplan_elim")
148 }
149
150 let (lp, changed, cid2c) = cse::elim_cmn_subplans(lp_top, lp_arena, expr_arena);
151
152 prune_unused_caches(lp_arena, cid2c);
153
154 lp_top = lp;
155 members.has_cache |= changed;
156 changed
157 } else {
158 false
159 }
160 } else {
161 false
162 };
163 #[cfg(not(feature = "cse"))]
164 let _cse_plan_changed = false;
165
166 if opt_flags.projection_pushdown() {
168 let mut projection_pushdown_opt = ProjectionPushDown::new();
169 let alp = lp_arena.take(lp_top);
170 let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
171 lp_arena.replace(lp_top, alp);
172
173 if projection_pushdown_opt.is_count_star {
174 let mut count_star_opt = CountStar::new();
175 count_star_opt.optimize_plan(lp_arena, expr_arena, lp_top)?;
176 }
177 }
178
179 if opt_flags.predicate_pushdown() {
180 let mut predicate_pushdown_opt = PredicatePushDown::new(
181 expr_eval,
182 pushdown_maintain_errors,
183 opt_flags.new_streaming(),
184 );
185 let alp = lp_arena.take(lp_top);
186 let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
187 lp_arena.replace(lp_top, alp);
188 }
189
190 if opt_flags.collapse_joins() && get_or_init_members!().has_filter_with_join_input {
192 collapse_joins::optimize(lp_top, lp_arena, expr_arena, opt_flags.new_streaming());
193 }
194
195 if opt_flags.fast_projection() {
197 rules.push(Box::new(SimpleProjectionAndCollapse::new(
198 opt_flags.eager(),
199 )));
200 }
201
202 if !opt_flags.eager() {
203 rules.push(Box::new(DelayRechunk::new()));
204 }
205
206 if opt_flags.slice_pushdown() {
207 let mut slice_pushdown_opt = SlicePushDown::new(
208 false, opt_flags.new_streaming(),
214 );
215 let alp = lp_arena.take(lp_top);
216 let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
217
218 lp_arena.replace(lp_top, alp);
219
220 rules.push(Box::new(slice_pushdown_opt));
222 }
223
224 if opt_flags.simplify_expr() {
227 rules.push(Box::new(SimplifyBooleanRule {}));
228 }
229
230 if !opt_flags.eager() {
231 rules.push(Box::new(FlattenUnionRule {}));
232 }
233
234 rules.push(Box::new(expand_datasets::ExpandDatasets {}) as Box<dyn OptimizationRule>);
236
237 lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top)?;
238
239 if opt_flags.cluster_with_columns() {
240 cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)
241 }
242
243 if _cse_plan_changed
244 && get_members_opt!()
245 .is_some_and(|members| members.has_joins_or_unions && members.has_cache)
246 {
247 cache_states::set_cache_states(
249 lp_top,
250 lp_arena,
251 expr_arena,
252 scratch,
253 expr_eval,
254 verbose,
255 pushdown_maintain_errors,
256 opt_flags.new_streaming(),
257 )?;
258 }
259
260 #[cfg(feature = "cse")]
262 if comm_subexpr_elim && !get_or_init_members!().has_ext_context {
263 let mut optimizer = CommonSubExprOptimizer::new();
264 let alp_node = IRNode::new_mutate(lp_top);
265
266 lp_top = try_with_ir_arena(lp_arena, expr_arena, |arena| {
267 let rewritten = alp_node.rewrite(&mut optimizer, arena)?;
268 Ok(rewritten.node())
269 })?;
270 }
271
272 #[cfg(debug_assertions)]
274 {
275 assert_eq!(
277 prev_schema.iter_names().collect::<Vec<_>>(),
278 lp_arena
279 .get(lp_top)
280 .schema(lp_arena)
281 .iter_names()
282 .collect::<Vec<_>>()
283 );
284 };
285
286 Ok(lp_top)
287}
288
289fn _get_or_init_members<'a>(
290 opt_members: &'a mut Option<MemberCollector>,
291 lp_top: Node,
292 lp_arena: &mut Arena<IR>,
293 expr_arena: &mut Arena<AExpr>,
294) -> &'a mut MemberCollector {
295 opt_members.get_or_insert_with(|| {
296 let mut members = MemberCollector::new();
297 members.collect(lp_top, lp_arena, expr_arena);
298
299 members
300 })
301}