1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{InvariantLevel, assert_expected_schema};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_outer_join::EliminateOuterJoin;
45use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
46use crate::extract_leaf_expressions::{ExtractLeafExpressions, PushDownLeafProjections};
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::optimize_unions::OptimizeUnions;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::rewrite_set_comparison::RewriteSetComparison;
56use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
57use crate::simplify_expressions::SimplifyExpressions;
58use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
59use crate::unions_to_filter::UnionsToFilter;
60use crate::utils::log_plan;
61
62pub trait OptimizerRule: Debug {
75 fn name(&self) -> &str;
77
78 fn apply_order(&self) -> Option<ApplyOrder> {
83 None
84 }
85
86 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
88 fn supports_rewrite(&self) -> bool {
89 true
90 }
91
92 fn rewrite(
127 &self,
128 _plan: LogicalPlan,
129 _config: &dyn OptimizerConfig,
130 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
131 internal_err!("rewrite is not implemented for {}", self.name())
132 }
133}
134
135pub trait OptimizerConfig {
137 fn query_execution_start_time(&self) -> Option<DateTime<Utc>>;
141
142 fn alias_generator(&self) -> &Arc<AliasGenerator>;
144
145 fn options(&self) -> Arc<ConfigOptions>;
146
147 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
148 None
149 }
150}
151
152#[derive(Debug)]
155pub struct OptimizerContext {
156 query_execution_start_time: Option<DateTime<Utc>>,
160
161 alias_generator: Arc<AliasGenerator>,
163
164 options: Arc<ConfigOptions>,
165}
166
167impl OptimizerContext {
168 pub fn new() -> Self {
170 let mut options = ConfigOptions::default();
171 options.optimizer.filter_null_join_keys = true;
172
173 Self::new_with_config_options(Arc::new(options))
174 }
175
176 pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
178 Self {
179 query_execution_start_time: Some(Utc::now()),
180 alias_generator: Arc::new(AliasGenerator::new()),
181 options,
182 }
183 }
184
185 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
187 Arc::make_mut(&mut self.options)
188 .optimizer
189 .filter_null_join_keys = filter_null_keys;
190 self
191 }
192
193 pub fn with_query_execution_start_time(
195 mut self,
196 query_execution_start_time: DateTime<Utc>,
197 ) -> Self {
198 self.query_execution_start_time = Some(query_execution_start_time);
199 self
200 }
201
202 pub fn without_query_execution_start_time(mut self) -> Self {
205 self.query_execution_start_time = None;
206 self
207 }
208
209 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
212 Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
213 self
214 }
215
216 pub fn with_max_passes(mut self, v: u8) -> Self {
218 Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
219 self
220 }
221}
222
223impl Default for OptimizerContext {
224 fn default() -> Self {
226 Self::new()
227 }
228}
229
230impl OptimizerConfig for OptimizerContext {
231 fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
232 self.query_execution_start_time
233 }
234
235 fn alias_generator(&self) -> &Arc<AliasGenerator> {
236 &self.alias_generator
237 }
238
239 fn options(&self) -> Arc<ConfigOptions> {
240 Arc::clone(&self.options)
241 }
242}
243
244#[derive(Clone, Debug)]
246pub struct Optimizer {
247 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
249}
250
251#[derive(Debug, Clone, Copy, PartialEq)]
256pub enum ApplyOrder {
257 TopDown,
259 BottomUp,
261}
262
263impl Default for Optimizer {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269impl Optimizer {
270 pub fn new() -> Self {
272 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
282 Arc::new(RewriteSetComparison::new()),
283 Arc::new(OptimizeUnions::new()),
284 Arc::new(UnionsToFilter::new()),
285 Arc::new(SimplifyExpressions::new()),
286 Arc::new(ReplaceDistinctWithAggregate::new()),
287 Arc::new(EliminateJoin::new()),
288 Arc::new(DecorrelatePredicateSubquery::new()),
289 Arc::new(ScalarSubqueryToJoin::new()),
290 Arc::new(DecorrelateLateralJoin::new()),
291 Arc::new(ExtractEquijoinPredicate::new()),
292 Arc::new(EliminateDuplicatedExpr::new()),
293 Arc::new(EliminateFilter::new()),
294 Arc::new(EliminateCrossJoin::new()),
295 Arc::new(EliminateLimit::new()),
296 Arc::new(PropagateEmptyRelation::new()),
297 Arc::new(FilterNullJoinKeys::default()),
298 Arc::new(EliminateOuterJoin::new()),
299 Arc::new(PushDownLimit::new()),
301 Arc::new(PushDownFilter::new()),
302 Arc::new(SingleDistinctToGroupBy::new()),
303 Arc::new(EliminateGroupByConstant::new()),
306 Arc::new(CommonSubexprEliminate::new()),
307 Arc::new(ExtractLeafExpressions::new()),
308 Arc::new(PushDownLeafProjections::new()),
309 Arc::new(OptimizeProjections::new()),
310 ];
311
312 Self::with_rules(rules)
313 }
314
315 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
317 Self { rules }
318 }
319}
320
321struct Rewriter<'a> {
323 apply_order: ApplyOrder,
324 rule: &'a dyn OptimizerRule,
325 config: &'a dyn OptimizerConfig,
326}
327
328impl<'a> Rewriter<'a> {
329 fn new(
330 apply_order: ApplyOrder,
331 rule: &'a dyn OptimizerRule,
332 config: &'a dyn OptimizerConfig,
333 ) -> Self {
334 Self {
335 apply_order,
336 rule,
337 config,
338 }
339 }
340}
341
342impl TreeNodeRewriter for Rewriter<'_> {
343 type Node = LogicalPlan;
344
345 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
346 if self.apply_order == ApplyOrder::TopDown {
347 self.rule.rewrite(node, self.config)
348 } else {
349 Ok(Transformed::no(node))
350 }
351 }
352
353 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
354 if self.apply_order == ApplyOrder::BottomUp {
355 self.rule.rewrite(node, self.config)
356 } else {
357 Ok(Transformed::no(node))
358 }
359 }
360}
361
362impl Optimizer {
363 pub fn optimize<F>(
366 &self,
367 plan: LogicalPlan,
368 config: &dyn OptimizerConfig,
369 mut observer: F,
370 ) -> Result<LogicalPlan>
371 where
372 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
373 {
374 plan.check_invariants(InvariantLevel::Executable)
376 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
377
378 let start_time = Instant::now();
379 let options = config.options();
380 let mut new_plan = plan;
381
382 let mut previous_plans = HashSet::with_capacity(16);
383 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
384
385 let starting_schema = Arc::clone(new_plan.schema());
386
387 let mut i = 0;
388 while i < options.optimizer.max_passes {
389 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
390
391 for rule in &self.rules {
392 let prev_plan = options
395 .optimizer
396 .skip_failed_rules
397 .then(|| new_plan.clone());
398
399 let starting_schema = Arc::clone(new_plan.schema());
400
401 let result = match rule.apply_order() {
402 Some(apply_order) => new_plan.rewrite_with_subqueries(
404 &mut Rewriter::new(apply_order, rule.as_ref(), config),
405 ),
406 None => {
408 rule.rewrite(new_plan, config)
409 },
410 }
411 .and_then(|tnr| {
412 assert_valid_optimization(&tnr.data, &starting_schema)
414 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
415
416 #[cfg(debug_assertions)]
418 tnr.data.check_invariants(InvariantLevel::Executable)
419 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
420
421 Ok(tnr)
422 });
423
424 match (result, prev_plan) {
426 (
428 Ok(Transformed {
429 data, transformed, ..
430 }),
431 _,
432 ) => {
433 new_plan = data;
434 observer(&new_plan, rule.as_ref());
435 if transformed {
436 log_plan(rule.name(), &new_plan);
437 } else {
438 debug!(
439 "Plan unchanged by optimizer rule '{}' (pass {})",
440 rule.name(),
441 i
442 );
443 }
444 }
445 (Err(e), Some(orig_plan)) => {
448 warn!(
452 "Skipping optimizer rule '{}' due to unexpected error: {}",
453 rule.name(),
454 e
455 );
456 new_plan = orig_plan;
457 }
458 (Err(e), None) => {
460 return Err(e.context(format!(
461 "Optimizer rule '{}' failed",
462 rule.name()
463 )));
464 }
465 }
466 }
467 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
468
469 let plan_is_fresh =
471 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
472 if !plan_is_fresh {
473 debug!("optimizer pass {i} did not make changes");
475 break;
476 }
477 i += 1;
478 }
479
480 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
482 e.context("Check optimizer-specific invariants after all passes")
483 })?;
484
485 new_plan
487 .check_invariants(InvariantLevel::Executable)
488 .map_err(|e| {
489 e.context("Invalid (non-executable) plan after LP Optimizers")
490 })?;
491
492 log_plan("Final optimized plan", &new_plan);
493 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
494 Ok(new_plan)
495 }
496}
497
498fn assert_valid_optimization(
503 plan: &LogicalPlan,
504 prev_schema: &Arc<DFSchema>,
505) -> Result<()> {
506 assert_expected_schema(prev_schema, plan)?;
509
510 Ok(())
511}
512
513#[cfg(test)]
514mod tests {
515 use std::sync::{Arc, Mutex};
516
517 use datafusion_common::tree_node::Transformed;
518 use datafusion_common::{
519 DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
520 };
521 use datafusion_expr::logical_plan::EmptyRelation;
522 use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit};
523
524 use crate::optimizer::Optimizer;
525 use crate::test::test_table_scan;
526 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
527
528 use super::ApplyOrder;
529
530 #[test]
531 fn skip_failing_rule() {
532 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
533 let config = OptimizerContext::new().with_skip_failing_rules(true);
534 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
535 produce_one_row: false,
536 schema: Arc::new(DFSchema::empty()),
537 });
538 opt.optimize(plan, &config, &observe).unwrap();
539 }
540
541 #[test]
542 fn no_skip_failing_rule() {
543 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
544 let config = OptimizerContext::new().with_skip_failing_rules(false);
545 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
546 produce_one_row: false,
547 schema: Arc::new(DFSchema::empty()),
548 });
549 let err = opt.optimize(plan, &config, &observe).unwrap_err();
550 assert_eq!(
551 "Optimizer rule 'bad rule' failed\ncaused by\n\
552 Error during planning: rule failed",
553 err.strip_backtrace()
554 );
555 }
556
557 #[test]
558 fn generate_different_schema() {
559 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
560 let config = OptimizerContext::new().with_skip_failing_rules(false);
561 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
562 produce_one_row: false,
563 schema: Arc::new(DFSchema::empty()),
564 });
565 let err = opt.optimize(plan, &config, &observe).unwrap_err();
566
567 assert_contains!(
569 err.strip_backtrace(),
570 "Failed due to a difference in schemas: original schema: DFSchema"
571 );
572 }
573
574 #[test]
575 fn skip_generate_different_schema() {
576 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
577 let config = OptimizerContext::new().with_skip_failing_rules(true);
578 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
579 produce_one_row: false,
580 schema: Arc::new(DFSchema::empty()),
581 });
582 opt.optimize(plan, &config, &observe).unwrap();
583 }
584
585 #[test]
586 fn generate_same_schema_different_metadata() -> Result<()> {
587 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
590 let config = OptimizerContext::new().with_skip_failing_rules(false);
591
592 let input = Arc::new(test_table_scan()?);
593 let input_schema = Arc::clone(input.schema());
594
595 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
596 vec![col("a"), col("b"), col("c")],
597 input,
598 add_metadata_to_fields(input_schema.as_ref()),
599 )?);
600
601 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
603 let optimized_plan = opt.optimize(plan, &config, &observe)?;
604 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
606 Ok(())
607 }
608
609 #[test]
610 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
611 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
615 let config = OptimizerContext::new().with_max_passes(16);
616
617 let initial_plan = LogicalPlanBuilder::empty(false)
618 .project([lit(1), lit(2), lit(3)])?
619 .project([lit(100)])? .build()?;
621
622 let mut plans: Vec<LogicalPlan> = Vec::new();
623 let final_plan =
624 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
625
626 assert_eq!(3, plans.len());
628
629 assert_eq!(initial_plan, final_plan);
631
632 Ok(())
633 }
634
635 #[test]
636 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
637 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
641 let config = OptimizerContext::new().with_max_passes(16);
642
643 let initial_plan = LogicalPlanBuilder::empty(false)
644 .project([lit(1), lit(2), lit(3)])?
645 .project([lit(100)])? .build()?;
647
648 let mut plans: Vec<LogicalPlan> = Vec::new();
649 let final_plan =
650 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
651
652 assert_eq!(4, plans.len());
654
655 assert_eq!(plans[0], final_plan);
657
658 Ok(())
659 }
660
661 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
662 let new_fields = schema
663 .iter()
664 .enumerate()
665 .map(|(i, (qualifier, field))| {
666 let metadata =
667 [("key".into(), format!("value {i}"))].into_iter().collect();
668
669 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
670 (qualifier.cloned(), Arc::new(new_arrow_field))
671 })
672 .collect::<Vec<_>>();
673
674 let new_metadata = schema.metadata().clone();
675 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
676 }
677
678 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
679
680 #[derive(Default, Debug)]
681 struct BadRule {}
682
683 impl OptimizerRule for BadRule {
684 fn name(&self) -> &str {
685 "bad rule"
686 }
687
688 fn supports_rewrite(&self) -> bool {
689 true
690 }
691
692 fn rewrite(
693 &self,
694 _plan: LogicalPlan,
695 _config: &dyn OptimizerConfig,
696 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
697 plan_err!("rule failed")
698 }
699 }
700
701 #[derive(Default, Debug)]
703 struct GetTableScanRule {}
704
705 impl OptimizerRule for GetTableScanRule {
706 fn name(&self) -> &str {
707 "get table_scan rule"
708 }
709
710 fn supports_rewrite(&self) -> bool {
711 true
712 }
713
714 fn rewrite(
715 &self,
716 _plan: LogicalPlan,
717 _config: &dyn OptimizerConfig,
718 ) -> Result<Transformed<LogicalPlan>> {
719 let table_scan = test_table_scan()?;
720 Ok(Transformed::yes(
721 LogicalPlanBuilder::from(table_scan).build()?,
722 ))
723 }
724 }
725
726 #[derive(Default, Debug)]
730 struct RotateProjectionRule {
731 reverse_on_first_pass: Mutex<bool>,
733 }
734
735 impl RotateProjectionRule {
736 fn new(reverse_on_first_pass: bool) -> Self {
737 Self {
738 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
739 }
740 }
741 }
742
743 impl OptimizerRule for RotateProjectionRule {
744 fn name(&self) -> &str {
745 "rotate_projection"
746 }
747
748 fn apply_order(&self) -> Option<ApplyOrder> {
749 Some(ApplyOrder::TopDown)
750 }
751
752 fn supports_rewrite(&self) -> bool {
753 true
754 }
755
756 fn rewrite(
757 &self,
758 plan: LogicalPlan,
759 _config: &dyn OptimizerConfig,
760 ) -> Result<Transformed<LogicalPlan>> {
761 let projection = match plan {
762 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
763 _ => return Ok(Transformed::no(plan)),
764 };
765
766 let mut exprs = projection.expr.clone();
767
768 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
769 if *reverse {
770 exprs.reverse();
771 *reverse = false;
772 } else {
773 exprs.rotate_left(1);
774 }
775
776 Ok(Transformed::yes(LogicalPlan::Projection(
777 Projection::try_new(exprs, Arc::clone(&projection.input))?,
778 )))
779 }
780 }
781}