1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{InvariantLevel, assert_expected_schema};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_outer_join::EliminateOuterJoin;
45use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
46use crate::extract_leaf_expressions::{ExtractLeafExpressions, PushDownLeafProjections};
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::optimize_unions::OptimizeUnions;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::rewrite_set_comparison::RewriteSetComparison;
56use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
57use crate::simplify_expressions::SimplifyExpressions;
58use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
59use crate::utils::log_plan;
60
61pub trait OptimizerRule: Debug {
74 fn name(&self) -> &str;
76
77 fn apply_order(&self) -> Option<ApplyOrder> {
82 None
83 }
84
85 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
87 fn supports_rewrite(&self) -> bool {
88 true
89 }
90
91 fn rewrite(
126 &self,
127 _plan: LogicalPlan,
128 _config: &dyn OptimizerConfig,
129 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
130 internal_err!("rewrite is not implemented for {}", self.name())
131 }
132}
133
134pub trait OptimizerConfig {
136 fn query_execution_start_time(&self) -> Option<DateTime<Utc>>;
140
141 fn alias_generator(&self) -> &Arc<AliasGenerator>;
143
144 fn options(&self) -> Arc<ConfigOptions>;
145
146 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
147 None
148 }
149}
150
151#[derive(Debug)]
154pub struct OptimizerContext {
155 query_execution_start_time: Option<DateTime<Utc>>,
159
160 alias_generator: Arc<AliasGenerator>,
162
163 options: Arc<ConfigOptions>,
164}
165
166impl OptimizerContext {
167 pub fn new() -> Self {
169 let mut options = ConfigOptions::default();
170 options.optimizer.filter_null_join_keys = true;
171
172 Self::new_with_config_options(Arc::new(options))
173 }
174
175 pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
177 Self {
178 query_execution_start_time: Some(Utc::now()),
179 alias_generator: Arc::new(AliasGenerator::new()),
180 options,
181 }
182 }
183
184 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
186 Arc::make_mut(&mut self.options)
187 .optimizer
188 .filter_null_join_keys = filter_null_keys;
189 self
190 }
191
192 pub fn with_query_execution_start_time(
194 mut self,
195 query_execution_start_time: DateTime<Utc>,
196 ) -> Self {
197 self.query_execution_start_time = Some(query_execution_start_time);
198 self
199 }
200
201 pub fn without_query_execution_start_time(mut self) -> Self {
204 self.query_execution_start_time = None;
205 self
206 }
207
208 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
211 Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
212 self
213 }
214
215 pub fn with_max_passes(mut self, v: u8) -> Self {
217 Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
218 self
219 }
220}
221
222impl Default for OptimizerContext {
223 fn default() -> Self {
225 Self::new()
226 }
227}
228
229impl OptimizerConfig for OptimizerContext {
230 fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
231 self.query_execution_start_time
232 }
233
234 fn alias_generator(&self) -> &Arc<AliasGenerator> {
235 &self.alias_generator
236 }
237
238 fn options(&self) -> Arc<ConfigOptions> {
239 Arc::clone(&self.options)
240 }
241}
242
243#[derive(Clone, Debug)]
245pub struct Optimizer {
246 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq)]
255pub enum ApplyOrder {
256 TopDown,
258 BottomUp,
260}
261
262impl Default for Optimizer {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268impl Optimizer {
269 pub fn new() -> Self {
271 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
281 Arc::new(RewriteSetComparison::new()),
282 Arc::new(OptimizeUnions::new()),
283 Arc::new(SimplifyExpressions::new()),
284 Arc::new(ReplaceDistinctWithAggregate::new()),
285 Arc::new(EliminateJoin::new()),
286 Arc::new(DecorrelatePredicateSubquery::new()),
287 Arc::new(ScalarSubqueryToJoin::new()),
288 Arc::new(DecorrelateLateralJoin::new()),
289 Arc::new(ExtractEquijoinPredicate::new()),
290 Arc::new(EliminateDuplicatedExpr::new()),
291 Arc::new(EliminateFilter::new()),
292 Arc::new(EliminateCrossJoin::new()),
293 Arc::new(EliminateLimit::new()),
294 Arc::new(PropagateEmptyRelation::new()),
295 Arc::new(FilterNullJoinKeys::default()),
296 Arc::new(EliminateOuterJoin::new()),
297 Arc::new(PushDownLimit::new()),
299 Arc::new(PushDownFilter::new()),
300 Arc::new(SingleDistinctToGroupBy::new()),
301 Arc::new(EliminateGroupByConstant::new()),
304 Arc::new(CommonSubexprEliminate::new()),
305 Arc::new(ExtractLeafExpressions::new()),
306 Arc::new(PushDownLeafProjections::new()),
307 Arc::new(OptimizeProjections::new()),
308 ];
309
310 Self::with_rules(rules)
311 }
312
313 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
315 Self { rules }
316 }
317}
318
319struct Rewriter<'a> {
321 apply_order: ApplyOrder,
322 rule: &'a dyn OptimizerRule,
323 config: &'a dyn OptimizerConfig,
324}
325
326impl<'a> Rewriter<'a> {
327 fn new(
328 apply_order: ApplyOrder,
329 rule: &'a dyn OptimizerRule,
330 config: &'a dyn OptimizerConfig,
331 ) -> Self {
332 Self {
333 apply_order,
334 rule,
335 config,
336 }
337 }
338}
339
340impl TreeNodeRewriter for Rewriter<'_> {
341 type Node = LogicalPlan;
342
343 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
344 if self.apply_order == ApplyOrder::TopDown {
345 self.rule.rewrite(node, self.config)
346 } else {
347 Ok(Transformed::no(node))
348 }
349 }
350
351 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
352 if self.apply_order == ApplyOrder::BottomUp {
353 self.rule.rewrite(node, self.config)
354 } else {
355 Ok(Transformed::no(node))
356 }
357 }
358}
359
360impl Optimizer {
361 pub fn optimize<F>(
364 &self,
365 plan: LogicalPlan,
366 config: &dyn OptimizerConfig,
367 mut observer: F,
368 ) -> Result<LogicalPlan>
369 where
370 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
371 {
372 plan.check_invariants(InvariantLevel::Executable)
374 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
375
376 let start_time = Instant::now();
377 let options = config.options();
378 let mut new_plan = plan;
379
380 let mut previous_plans = HashSet::with_capacity(16);
381 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
382
383 let starting_schema = Arc::clone(new_plan.schema());
384
385 let mut i = 0;
386 while i < options.optimizer.max_passes {
387 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
388
389 for rule in &self.rules {
390 let prev_plan = options
393 .optimizer
394 .skip_failed_rules
395 .then(|| new_plan.clone());
396
397 let starting_schema = Arc::clone(new_plan.schema());
398
399 let result = match rule.apply_order() {
400 Some(apply_order) => new_plan.rewrite_with_subqueries(
402 &mut Rewriter::new(apply_order, rule.as_ref(), config),
403 ),
404 None => {
406 rule.rewrite(new_plan, config)
407 },
408 }
409 .and_then(|tnr| {
410 assert_valid_optimization(&tnr.data, &starting_schema)
412 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
413
414 #[cfg(debug_assertions)]
416 tnr.data.check_invariants(InvariantLevel::Executable)
417 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
418
419 Ok(tnr)
420 });
421
422 match (result, prev_plan) {
424 (
426 Ok(Transformed {
427 data, transformed, ..
428 }),
429 _,
430 ) => {
431 new_plan = data;
432 observer(&new_plan, rule.as_ref());
433 if transformed {
434 log_plan(rule.name(), &new_plan);
435 } else {
436 debug!(
437 "Plan unchanged by optimizer rule '{}' (pass {})",
438 rule.name(),
439 i
440 );
441 }
442 }
443 (Err(e), Some(orig_plan)) => {
446 warn!(
450 "Skipping optimizer rule '{}' due to unexpected error: {}",
451 rule.name(),
452 e
453 );
454 new_plan = orig_plan;
455 }
456 (Err(e), None) => {
458 return Err(e.context(format!(
459 "Optimizer rule '{}' failed",
460 rule.name()
461 )));
462 }
463 }
464 }
465 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
466
467 let plan_is_fresh =
469 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
470 if !plan_is_fresh {
471 debug!("optimizer pass {i} did not make changes");
473 break;
474 }
475 i += 1;
476 }
477
478 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
480 e.context("Check optimizer-specific invariants after all passes")
481 })?;
482
483 new_plan
485 .check_invariants(InvariantLevel::Executable)
486 .map_err(|e| {
487 e.context("Invalid (non-executable) plan after LP Optimizers")
488 })?;
489
490 log_plan("Final optimized plan", &new_plan);
491 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
492 Ok(new_plan)
493 }
494}
495
496fn assert_valid_optimization(
501 plan: &LogicalPlan,
502 prev_schema: &Arc<DFSchema>,
503) -> Result<()> {
504 assert_expected_schema(prev_schema, plan)?;
507
508 Ok(())
509}
510
511#[cfg(test)]
512mod tests {
513 use std::sync::{Arc, Mutex};
514
515 use datafusion_common::tree_node::Transformed;
516 use datafusion_common::{
517 DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
518 };
519 use datafusion_expr::logical_plan::EmptyRelation;
520 use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit};
521
522 use crate::optimizer::Optimizer;
523 use crate::test::test_table_scan;
524 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
525
526 use super::ApplyOrder;
527
528 #[test]
529 fn skip_failing_rule() {
530 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
531 let config = OptimizerContext::new().with_skip_failing_rules(true);
532 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
533 produce_one_row: false,
534 schema: Arc::new(DFSchema::empty()),
535 });
536 opt.optimize(plan, &config, &observe).unwrap();
537 }
538
539 #[test]
540 fn no_skip_failing_rule() {
541 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
542 let config = OptimizerContext::new().with_skip_failing_rules(false);
543 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
544 produce_one_row: false,
545 schema: Arc::new(DFSchema::empty()),
546 });
547 let err = opt.optimize(plan, &config, &observe).unwrap_err();
548 assert_eq!(
549 "Optimizer rule 'bad rule' failed\ncaused by\n\
550 Error during planning: rule failed",
551 err.strip_backtrace()
552 );
553 }
554
555 #[test]
556 fn generate_different_schema() {
557 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
558 let config = OptimizerContext::new().with_skip_failing_rules(false);
559 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
560 produce_one_row: false,
561 schema: Arc::new(DFSchema::empty()),
562 });
563 let err = opt.optimize(plan, &config, &observe).unwrap_err();
564
565 assert_contains!(
567 err.strip_backtrace(),
568 "Failed due to a difference in schemas: original schema: DFSchema"
569 );
570 }
571
572 #[test]
573 fn skip_generate_different_schema() {
574 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
575 let config = OptimizerContext::new().with_skip_failing_rules(true);
576 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
577 produce_one_row: false,
578 schema: Arc::new(DFSchema::empty()),
579 });
580 opt.optimize(plan, &config, &observe).unwrap();
581 }
582
583 #[test]
584 fn generate_same_schema_different_metadata() -> Result<()> {
585 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
588 let config = OptimizerContext::new().with_skip_failing_rules(false);
589
590 let input = Arc::new(test_table_scan()?);
591 let input_schema = Arc::clone(input.schema());
592
593 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
594 vec![col("a"), col("b"), col("c")],
595 input,
596 add_metadata_to_fields(input_schema.as_ref()),
597 )?);
598
599 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
601 let optimized_plan = opt.optimize(plan, &config, &observe)?;
602 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
604 Ok(())
605 }
606
607 #[test]
608 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
609 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
613 let config = OptimizerContext::new().with_max_passes(16);
614
615 let initial_plan = LogicalPlanBuilder::empty(false)
616 .project([lit(1), lit(2), lit(3)])?
617 .project([lit(100)])? .build()?;
619
620 let mut plans: Vec<LogicalPlan> = Vec::new();
621 let final_plan =
622 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
623
624 assert_eq!(3, plans.len());
626
627 assert_eq!(initial_plan, final_plan);
629
630 Ok(())
631 }
632
633 #[test]
634 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
635 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
639 let config = OptimizerContext::new().with_max_passes(16);
640
641 let initial_plan = LogicalPlanBuilder::empty(false)
642 .project([lit(1), lit(2), lit(3)])?
643 .project([lit(100)])? .build()?;
645
646 let mut plans: Vec<LogicalPlan> = Vec::new();
647 let final_plan =
648 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
649
650 assert_eq!(4, plans.len());
652
653 assert_eq!(plans[0], final_plan);
655
656 Ok(())
657 }
658
659 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
660 let new_fields = schema
661 .iter()
662 .enumerate()
663 .map(|(i, (qualifier, field))| {
664 let metadata =
665 [("key".into(), format!("value {i}"))].into_iter().collect();
666
667 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
668 (qualifier.cloned(), Arc::new(new_arrow_field))
669 })
670 .collect::<Vec<_>>();
671
672 let new_metadata = schema.metadata().clone();
673 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
674 }
675
676 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
677
678 #[derive(Default, Debug)]
679 struct BadRule {}
680
681 impl OptimizerRule for BadRule {
682 fn name(&self) -> &str {
683 "bad rule"
684 }
685
686 fn supports_rewrite(&self) -> bool {
687 true
688 }
689
690 fn rewrite(
691 &self,
692 _plan: LogicalPlan,
693 _config: &dyn OptimizerConfig,
694 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
695 plan_err!("rule failed")
696 }
697 }
698
699 #[derive(Default, Debug)]
701 struct GetTableScanRule {}
702
703 impl OptimizerRule for GetTableScanRule {
704 fn name(&self) -> &str {
705 "get table_scan rule"
706 }
707
708 fn supports_rewrite(&self) -> bool {
709 true
710 }
711
712 fn rewrite(
713 &self,
714 _plan: LogicalPlan,
715 _config: &dyn OptimizerConfig,
716 ) -> Result<Transformed<LogicalPlan>> {
717 let table_scan = test_table_scan()?;
718 Ok(Transformed::yes(
719 LogicalPlanBuilder::from(table_scan).build()?,
720 ))
721 }
722 }
723
724 #[derive(Default, Debug)]
728 struct RotateProjectionRule {
729 reverse_on_first_pass: Mutex<bool>,
731 }
732
733 impl RotateProjectionRule {
734 fn new(reverse_on_first_pass: bool) -> Self {
735 Self {
736 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
737 }
738 }
739 }
740
741 impl OptimizerRule for RotateProjectionRule {
742 fn name(&self) -> &str {
743 "rotate_projection"
744 }
745
746 fn apply_order(&self) -> Option<ApplyOrder> {
747 Some(ApplyOrder::TopDown)
748 }
749
750 fn supports_rewrite(&self) -> bool {
751 true
752 }
753
754 fn rewrite(
755 &self,
756 plan: LogicalPlan,
757 _config: &dyn OptimizerConfig,
758 ) -> Result<Transformed<LogicalPlan>> {
759 let projection = match plan {
760 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
761 _ => return Ok(Transformed::no(plan)),
762 };
763
764 let mut exprs = projection.expr.clone();
765
766 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
767 if *reverse {
768 exprs.reverse();
769 *reverse = false;
770 } else {
771 exprs.rotate_left(1);
772 }
773
774 Ok(Transformed::yes(LogicalPlan::Projection(
775 Projection::try_new(exprs, Arc::clone(&projection.input))?,
776 )))
777 }
778 }
779}