1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_nested_union::EliminateNestedUnion;
45use crate::eliminate_one_union::EliminateOneUnion;
46use crate::eliminate_outer_join::EliminateOuterJoin;
47use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
48use crate::filter_null_join_keys::FilterNullJoinKeys;
49use crate::optimize_projections::OptimizeProjections;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
56use crate::simplify_expressions::SimplifyExpressions;
57use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
58use crate::utils::log_plan;
59
60pub trait OptimizerRule: Debug {
73 fn name(&self) -> &str;
75
76 fn apply_order(&self) -> Option<ApplyOrder> {
81 None
82 }
83
84 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
86 fn supports_rewrite(&self) -> bool {
87 true
88 }
89
90 fn rewrite(
93 &self,
94 _plan: LogicalPlan,
95 _config: &dyn OptimizerConfig,
96 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
97 internal_err!("rewrite is not implemented for {}", self.name())
98 }
99}
100
101pub trait OptimizerConfig {
103 fn query_execution_start_time(&self) -> DateTime<Utc>;
106
107 fn alias_generator(&self) -> &Arc<AliasGenerator>;
109
110 fn options(&self) -> &ConfigOptions;
111
112 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
113 None
114 }
115}
116
117#[derive(Debug)]
120pub struct OptimizerContext {
121 query_execution_start_time: DateTime<Utc>,
124
125 alias_generator: Arc<AliasGenerator>,
127
128 options: ConfigOptions,
129}
130
131impl OptimizerContext {
132 pub fn new() -> Self {
134 let mut options = ConfigOptions::default();
135 options.optimizer.filter_null_join_keys = true;
136
137 Self {
138 query_execution_start_time: Utc::now(),
139 alias_generator: Arc::new(AliasGenerator::new()),
140 options,
141 }
142 }
143
144 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
146 self.options.optimizer.filter_null_join_keys = filter_null_keys;
147 self
148 }
149
150 pub fn with_query_execution_start_time(
153 mut self,
154 query_execution_tart_time: DateTime<Utc>,
155 ) -> Self {
156 self.query_execution_start_time = query_execution_tart_time;
157 self
158 }
159
160 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
163 self.options.optimizer.skip_failed_rules = b;
164 self
165 }
166
167 pub fn with_max_passes(mut self, v: u8) -> Self {
169 self.options.optimizer.max_passes = v as usize;
170 self
171 }
172}
173
174impl Default for OptimizerContext {
175 fn default() -> Self {
177 Self::new()
178 }
179}
180
181impl OptimizerConfig for OptimizerContext {
182 fn query_execution_start_time(&self) -> DateTime<Utc> {
183 self.query_execution_start_time
184 }
185
186 fn alias_generator(&self) -> &Arc<AliasGenerator> {
187 &self.alias_generator
188 }
189
190 fn options(&self) -> &ConfigOptions {
191 &self.options
192 }
193}
194
195#[derive(Clone, Debug)]
197pub struct Optimizer {
198 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq)]
207pub enum ApplyOrder {
208 TopDown,
210 BottomUp,
212}
213
214impl Default for Optimizer {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220impl Optimizer {
221 pub fn new() -> Self {
223 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
224 Arc::new(EliminateNestedUnion::new()),
225 Arc::new(SimplifyExpressions::new()),
226 Arc::new(ReplaceDistinctWithAggregate::new()),
227 Arc::new(EliminateJoin::new()),
228 Arc::new(DecorrelatePredicateSubquery::new()),
229 Arc::new(ScalarSubqueryToJoin::new()),
230 Arc::new(DecorrelateLateralJoin::new()),
231 Arc::new(ExtractEquijoinPredicate::new()),
232 Arc::new(EliminateDuplicatedExpr::new()),
233 Arc::new(EliminateFilter::new()),
234 Arc::new(EliminateCrossJoin::new()),
235 Arc::new(EliminateLimit::new()),
236 Arc::new(PropagateEmptyRelation::new()),
237 Arc::new(EliminateOneUnion::new()),
239 Arc::new(FilterNullJoinKeys::default()),
240 Arc::new(EliminateOuterJoin::new()),
241 Arc::new(PushDownLimit::new()),
243 Arc::new(PushDownFilter::new()),
244 Arc::new(SingleDistinctToGroupBy::new()),
245 Arc::new(EliminateGroupByConstant::new()),
248 Arc::new(CommonSubexprEliminate::new()),
249 Arc::new(OptimizeProjections::new()),
250 ];
251
252 Self::with_rules(rules)
253 }
254
255 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
257 Self { rules }
258 }
259}
260
261struct Rewriter<'a> {
263 apply_order: ApplyOrder,
264 rule: &'a dyn OptimizerRule,
265 config: &'a dyn OptimizerConfig,
266}
267
268impl<'a> Rewriter<'a> {
269 fn new(
270 apply_order: ApplyOrder,
271 rule: &'a dyn OptimizerRule,
272 config: &'a dyn OptimizerConfig,
273 ) -> Self {
274 Self {
275 apply_order,
276 rule,
277 config,
278 }
279 }
280}
281
282impl TreeNodeRewriter for Rewriter<'_> {
283 type Node = LogicalPlan;
284
285 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
286 if self.apply_order == ApplyOrder::TopDown {
287 {
288 self.rule.rewrite(node, self.config)
289 }
290 } else {
291 Ok(Transformed::no(node))
292 }
293 }
294
295 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
296 if self.apply_order == ApplyOrder::BottomUp {
297 {
298 self.rule.rewrite(node, self.config)
299 }
300 } else {
301 Ok(Transformed::no(node))
302 }
303 }
304}
305
306impl Optimizer {
307 pub fn optimize<F>(
310 &self,
311 plan: LogicalPlan,
312 config: &dyn OptimizerConfig,
313 mut observer: F,
314 ) -> Result<LogicalPlan>
315 where
316 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
317 {
318 plan.check_invariants(InvariantLevel::Executable)
320 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
321
322 let start_time = Instant::now();
323 let options = config.options();
324 let mut new_plan = plan;
325
326 let mut previous_plans = HashSet::with_capacity(16);
327 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
328
329 let starting_schema = Arc::clone(new_plan.schema());
330
331 let mut i = 0;
332 while i < options.optimizer.max_passes {
333 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
334
335 for rule in &self.rules {
336 let prev_plan = options
339 .optimizer
340 .skip_failed_rules
341 .then(|| new_plan.clone());
342
343 let starting_schema = Arc::clone(new_plan.schema());
344
345 let result = match rule.apply_order() {
346 Some(apply_order) => new_plan.rewrite_with_subqueries(
348 &mut Rewriter::new(apply_order, rule.as_ref(), config),
349 ),
350 None => {
352 rule.rewrite(new_plan, config)
353 },
354 }
355 .and_then(|tnr| {
356 assert_valid_optimization(&tnr.data, &starting_schema)
358 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
359
360 #[cfg(debug_assertions)]
362 tnr.data.check_invariants(InvariantLevel::Executable)
363 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
364
365 Ok(tnr)
366 });
367
368 match (result, prev_plan) {
370 (
372 Ok(Transformed {
373 data, transformed, ..
374 }),
375 _,
376 ) => {
377 new_plan = data;
378 observer(&new_plan, rule.as_ref());
379 if transformed {
380 log_plan(rule.name(), &new_plan);
381 } else {
382 debug!(
383 "Plan unchanged by optimizer rule '{}' (pass {})",
384 rule.name(),
385 i
386 );
387 }
388 }
389 (Err(e), Some(orig_plan)) => {
392 warn!(
396 "Skipping optimizer rule '{}' due to unexpected error: {}",
397 rule.name(),
398 e
399 );
400 new_plan = orig_plan;
401 }
402 (Err(e), None) => {
404 return Err(e.context(format!(
405 "Optimizer rule '{}' failed",
406 rule.name()
407 )));
408 }
409 }
410 }
411 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
412
413 let plan_is_fresh =
415 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
416 if !plan_is_fresh {
417 debug!("optimizer pass {i} did not make changes");
419 break;
420 }
421 i += 1;
422 }
423
424 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
426 e.context("Check optimizer-specific invariants after all passes")
427 })?;
428
429 new_plan
431 .check_invariants(InvariantLevel::Executable)
432 .map_err(|e| {
433 e.context("Invalid (non-executable) plan after LP Optimizers")
434 })?;
435
436 log_plan("Final optimized plan", &new_plan);
437 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
438 Ok(new_plan)
439 }
440}
441
442fn assert_valid_optimization(
447 plan: &LogicalPlan,
448 prev_schema: &Arc<DFSchema>,
449) -> Result<()> {
450 assert_expected_schema(prev_schema, plan)?;
453
454 Ok(())
455}
456
457#[cfg(test)]
458mod tests {
459 use std::sync::{Arc, Mutex};
460
461 use datafusion_common::tree_node::Transformed;
462 use datafusion_common::{
463 assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
464 };
465 use datafusion_expr::logical_plan::EmptyRelation;
466 use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
467
468 use crate::optimizer::Optimizer;
469 use crate::test::test_table_scan;
470 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
471
472 use super::ApplyOrder;
473
474 #[test]
475 fn skip_failing_rule() {
476 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
477 let config = OptimizerContext::new().with_skip_failing_rules(true);
478 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
479 produce_one_row: false,
480 schema: Arc::new(DFSchema::empty()),
481 });
482 opt.optimize(plan, &config, &observe).unwrap();
483 }
484
485 #[test]
486 fn no_skip_failing_rule() {
487 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
488 let config = OptimizerContext::new().with_skip_failing_rules(false);
489 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
490 produce_one_row: false,
491 schema: Arc::new(DFSchema::empty()),
492 });
493 let err = opt.optimize(plan, &config, &observe).unwrap_err();
494 assert_eq!(
495 "Optimizer rule 'bad rule' failed\ncaused by\n\
496 Error during planning: rule failed",
497 err.strip_backtrace()
498 );
499 }
500
501 #[test]
502 fn generate_different_schema() {
503 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
504 let config = OptimizerContext::new().with_skip_failing_rules(false);
505 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
506 produce_one_row: false,
507 schema: Arc::new(DFSchema::empty()),
508 });
509 let err = opt.optimize(plan, &config, &observe).unwrap_err();
510
511 assert_contains!(
513 err.strip_backtrace(),
514 "Failed due to a difference in schemas: original schema: DFSchema"
515 );
516 }
517
518 #[test]
519 fn skip_generate_different_schema() {
520 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
521 let config = OptimizerContext::new().with_skip_failing_rules(true);
522 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
523 produce_one_row: false,
524 schema: Arc::new(DFSchema::empty()),
525 });
526 opt.optimize(plan, &config, &observe).unwrap();
527 }
528
529 #[test]
530 fn generate_same_schema_different_metadata() -> Result<()> {
531 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
534 let config = OptimizerContext::new().with_skip_failing_rules(false);
535
536 let input = Arc::new(test_table_scan()?);
537 let input_schema = Arc::clone(input.schema());
538
539 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
540 vec![col("a"), col("b"), col("c")],
541 input,
542 add_metadata_to_fields(input_schema.as_ref()),
543 )?);
544
545 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
547 let optimized_plan = opt.optimize(plan, &config, &observe)?;
548 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
550 Ok(())
551 }
552
553 #[test]
554 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
555 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
559 let config = OptimizerContext::new().with_max_passes(16);
560
561 let initial_plan = LogicalPlanBuilder::empty(false)
562 .project([lit(1), lit(2), lit(3)])?
563 .project([lit(100)])? .build()?;
565
566 let mut plans: Vec<LogicalPlan> = Vec::new();
567 let final_plan =
568 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
569
570 assert_eq!(3, plans.len());
572
573 assert_eq!(initial_plan, final_plan);
575
576 Ok(())
577 }
578
579 #[test]
580 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
581 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
585 let config = OptimizerContext::new().with_max_passes(16);
586
587 let initial_plan = LogicalPlanBuilder::empty(false)
588 .project([lit(1), lit(2), lit(3)])?
589 .project([lit(100)])? .build()?;
591
592 let mut plans: Vec<LogicalPlan> = Vec::new();
593 let final_plan =
594 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
595
596 assert_eq!(4, plans.len());
598
599 assert_eq!(plans[0], final_plan);
601
602 Ok(())
603 }
604
605 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
606 let new_fields = schema
607 .iter()
608 .enumerate()
609 .map(|(i, (qualifier, field))| {
610 let metadata =
611 [("key".into(), format!("value {i}"))].into_iter().collect();
612
613 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
614 (qualifier.cloned(), Arc::new(new_arrow_field))
615 })
616 .collect::<Vec<_>>();
617
618 let new_metadata = schema.metadata().clone();
619 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
620 }
621
622 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
623
624 #[derive(Default, Debug)]
625 struct BadRule {}
626
627 impl OptimizerRule for BadRule {
628 fn name(&self) -> &str {
629 "bad rule"
630 }
631
632 fn supports_rewrite(&self) -> bool {
633 true
634 }
635
636 fn rewrite(
637 &self,
638 _plan: LogicalPlan,
639 _config: &dyn OptimizerConfig,
640 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
641 plan_err!("rule failed")
642 }
643 }
644
645 #[derive(Default, Debug)]
647 struct GetTableScanRule {}
648
649 impl OptimizerRule for GetTableScanRule {
650 fn name(&self) -> &str {
651 "get table_scan rule"
652 }
653
654 fn supports_rewrite(&self) -> bool {
655 true
656 }
657
658 fn rewrite(
659 &self,
660 _plan: LogicalPlan,
661 _config: &dyn OptimizerConfig,
662 ) -> Result<Transformed<LogicalPlan>> {
663 let table_scan = test_table_scan()?;
664 Ok(Transformed::yes(
665 LogicalPlanBuilder::from(table_scan).build()?,
666 ))
667 }
668 }
669
670 #[derive(Default, Debug)]
674 struct RotateProjectionRule {
675 reverse_on_first_pass: Mutex<bool>,
677 }
678
679 impl RotateProjectionRule {
680 fn new(reverse_on_first_pass: bool) -> Self {
681 Self {
682 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
683 }
684 }
685 }
686
687 impl OptimizerRule for RotateProjectionRule {
688 fn name(&self) -> &str {
689 "rotate_projection"
690 }
691
692 fn apply_order(&self) -> Option<ApplyOrder> {
693 Some(ApplyOrder::TopDown)
694 }
695
696 fn supports_rewrite(&self) -> bool {
697 true
698 }
699
700 fn rewrite(
701 &self,
702 plan: LogicalPlan,
703 _config: &dyn OptimizerConfig,
704 ) -> Result<Transformed<LogicalPlan>> {
705 let projection = match plan {
706 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
707 _ => return Ok(Transformed::no(plan)),
708 };
709
710 let mut exprs = projection.expr.clone();
711
712 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
713 if *reverse {
714 exprs.reverse();
715 *reverse = false;
716 } else {
717 exprs.rotate_left(1);
718 }
719
720 Ok(Transformed::yes(LogicalPlan::Projection(
721 Projection::try_new(exprs, Arc::clone(&projection.input))?,
722 )))
723 }
724 }
725}