1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
37use crate::eliminate_cross_join::EliminateCrossJoin;
38use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
39use crate::eliminate_filter::EliminateFilter;
40use crate::eliminate_group_by_constant::EliminateGroupByConstant;
41use crate::eliminate_join::EliminateJoin;
42use crate::eliminate_limit::EliminateLimit;
43use crate::eliminate_nested_union::EliminateNestedUnion;
44use crate::eliminate_one_union::EliminateOneUnion;
45use crate::eliminate_outer_join::EliminateOuterJoin;
46use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::plan_signature::LogicalPlanSignature;
50use crate::propagate_empty_relation::PropagateEmptyRelation;
51use crate::push_down_filter::PushDownFilter;
52use crate::push_down_limit::PushDownLimit;
53use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
54use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
55use crate::simplify_expressions::SimplifyExpressions;
56use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
57use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
58use crate::utils::log_plan;
59
60pub trait OptimizerRule: Debug {
73 #[deprecated(
80 since = "40.0.0",
81 note = "please implement supports_rewrite and rewrite instead"
82 )]
83 fn try_optimize(
84 &self,
85 _plan: &LogicalPlan,
86 _config: &dyn OptimizerConfig,
87 ) -> Result<Option<LogicalPlan>> {
88 internal_err!("Should have called rewrite")
89 }
90
91 fn name(&self) -> &str;
93
94 fn apply_order(&self) -> Option<ApplyOrder> {
99 None
100 }
101
102 fn supports_rewrite(&self) -> bool {
104 true
105 }
106
107 fn rewrite(
113 &self,
114 _plan: LogicalPlan,
115 _config: &dyn OptimizerConfig,
116 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
117 internal_err!("rewrite is not implemented for {}", self.name())
118 }
119}
120
121pub trait OptimizerConfig {
123 fn query_execution_start_time(&self) -> DateTime<Utc>;
126
127 fn alias_generator(&self) -> &Arc<AliasGenerator>;
129
130 fn options(&self) -> &ConfigOptions;
131
132 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
133 None
134 }
135}
136
137#[derive(Debug)]
140pub struct OptimizerContext {
141 query_execution_start_time: DateTime<Utc>,
144
145 alias_generator: Arc<AliasGenerator>,
147
148 options: ConfigOptions,
149}
150
151impl OptimizerContext {
152 pub fn new() -> Self {
154 let mut options = ConfigOptions::default();
155 options.optimizer.filter_null_join_keys = true;
156
157 Self {
158 query_execution_start_time: Utc::now(),
159 alias_generator: Arc::new(AliasGenerator::new()),
160 options,
161 }
162 }
163
164 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
166 self.options.optimizer.filter_null_join_keys = filter_null_keys;
167 self
168 }
169
170 pub fn with_query_execution_start_time(
173 mut self,
174 query_execution_tart_time: DateTime<Utc>,
175 ) -> Self {
176 self.query_execution_start_time = query_execution_tart_time;
177 self
178 }
179
180 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
183 self.options.optimizer.skip_failed_rules = b;
184 self
185 }
186
187 pub fn with_max_passes(mut self, v: u8) -> Self {
189 self.options.optimizer.max_passes = v as usize;
190 self
191 }
192}
193
194impl Default for OptimizerContext {
195 fn default() -> Self {
197 Self::new()
198 }
199}
200
201impl OptimizerConfig for OptimizerContext {
202 fn query_execution_start_time(&self) -> DateTime<Utc> {
203 self.query_execution_start_time
204 }
205
206 fn alias_generator(&self) -> &Arc<AliasGenerator> {
207 &self.alias_generator
208 }
209
210 fn options(&self) -> &ConfigOptions {
211 &self.options
212 }
213}
214
215#[derive(Clone, Debug)]
217pub struct Optimizer {
218 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
220}
221
222#[derive(Debug, Clone, Copy, PartialEq)]
227pub enum ApplyOrder {
228 TopDown,
230 BottomUp,
232}
233
234impl Default for Optimizer {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240impl Optimizer {
241 pub fn new() -> Self {
243 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
244 Arc::new(EliminateNestedUnion::new()),
245 Arc::new(SimplifyExpressions::new()),
246 Arc::new(UnwrapCastInComparison::new()),
247 Arc::new(ReplaceDistinctWithAggregate::new()),
248 Arc::new(EliminateJoin::new()),
249 Arc::new(DecorrelatePredicateSubquery::new()),
250 Arc::new(ScalarSubqueryToJoin::new()),
251 Arc::new(ExtractEquijoinPredicate::new()),
252 Arc::new(EliminateDuplicatedExpr::new()),
253 Arc::new(EliminateFilter::new()),
254 Arc::new(EliminateCrossJoin::new()),
255 Arc::new(CommonSubexprEliminate::new()),
256 Arc::new(EliminateLimit::new()),
257 Arc::new(PropagateEmptyRelation::new()),
258 Arc::new(EliminateOneUnion::new()),
260 Arc::new(FilterNullJoinKeys::default()),
261 Arc::new(EliminateOuterJoin::new()),
262 Arc::new(PushDownLimit::new()),
264 Arc::new(PushDownFilter::new()),
265 Arc::new(SingleDistinctToGroupBy::new()),
266 Arc::new(SimplifyExpressions::new()),
269 Arc::new(UnwrapCastInComparison::new()),
270 Arc::new(CommonSubexprEliminate::new()),
271 Arc::new(EliminateGroupByConstant::new()),
272 Arc::new(OptimizeProjections::new()),
273 ];
274
275 Self::with_rules(rules)
276 }
277
278 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
280 Self { rules }
281 }
282}
283
284struct Rewriter<'a> {
286 apply_order: ApplyOrder,
287 rule: &'a dyn OptimizerRule,
288 config: &'a dyn OptimizerConfig,
289}
290
291impl<'a> Rewriter<'a> {
292 fn new(
293 apply_order: ApplyOrder,
294 rule: &'a dyn OptimizerRule,
295 config: &'a dyn OptimizerConfig,
296 ) -> Self {
297 Self {
298 apply_order,
299 rule,
300 config,
301 }
302 }
303}
304
305impl TreeNodeRewriter for Rewriter<'_> {
306 type Node = LogicalPlan;
307
308 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
309 if self.apply_order == ApplyOrder::TopDown {
310 optimize_plan_node(node, self.rule, self.config)
311 } else {
312 Ok(Transformed::no(node))
313 }
314 }
315
316 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
317 if self.apply_order == ApplyOrder::BottomUp {
318 optimize_plan_node(node, self.rule, self.config)
319 } else {
320 Ok(Transformed::no(node))
321 }
322 }
323}
324
325fn optimize_plan_node(
327 plan: LogicalPlan,
328 rule: &dyn OptimizerRule,
329 config: &dyn OptimizerConfig,
330) -> Result<Transformed<LogicalPlan>> {
331 if rule.supports_rewrite() {
332 return rule.rewrite(plan, config);
333 }
334
335 #[allow(deprecated)]
336 rule.try_optimize(&plan, config).map(|maybe_plan| {
337 match maybe_plan {
338 Some(new_plan) => {
339 Transformed::yes(new_plan)
341 }
342 None => Transformed::no(plan),
343 }
344 })
345}
346
347impl Optimizer {
348 pub fn optimize<F>(
351 &self,
352 plan: LogicalPlan,
353 config: &dyn OptimizerConfig,
354 mut observer: F,
355 ) -> Result<LogicalPlan>
356 where
357 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
358 {
359 plan.check_invariants(InvariantLevel::Executable)
361 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
362
363 let start_time = Instant::now();
364 let options = config.options();
365 let mut new_plan = plan;
366
367 let mut previous_plans = HashSet::with_capacity(16);
368 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
369
370 let starting_schema = Arc::clone(new_plan.schema());
371
372 let mut i = 0;
373 while i < options.optimizer.max_passes {
374 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
375
376 for rule in &self.rules {
377 let prev_plan = options
380 .optimizer
381 .skip_failed_rules
382 .then(|| new_plan.clone());
383
384 let starting_schema = Arc::clone(new_plan.schema());
385
386 let result = match rule.apply_order() {
387 Some(apply_order) => new_plan.rewrite_with_subqueries(
389 &mut Rewriter::new(apply_order, rule.as_ref(), config),
390 ),
391 None => optimize_plan_node(new_plan, rule.as_ref(), config),
393 }
394 .and_then(|tnr| {
395 assert_valid_optimization(&tnr.data, &starting_schema)
397 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
398
399 #[cfg(debug_assertions)]
401 tnr.data.check_invariants(InvariantLevel::Executable)
402 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
403
404 Ok(tnr)
405 });
406
407 match (result, prev_plan) {
409 (
411 Ok(Transformed {
412 data, transformed, ..
413 }),
414 _,
415 ) => {
416 new_plan = data;
417 observer(&new_plan, rule.as_ref());
418 if transformed {
419 log_plan(rule.name(), &new_plan);
420 } else {
421 debug!(
422 "Plan unchanged by optimizer rule '{}' (pass {})",
423 rule.name(),
424 i
425 );
426 }
427 }
428 (Err(e), Some(orig_plan)) => {
431 warn!(
435 "Skipping optimizer rule '{}' due to unexpected error: {}",
436 rule.name(),
437 e
438 );
439 new_plan = orig_plan;
440 }
441 (Err(e), None) => {
443 return Err(e.context(format!(
444 "Optimizer rule '{}' failed",
445 rule.name()
446 )));
447 }
448 }
449 }
450 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
451
452 let plan_is_fresh =
454 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
455 if !plan_is_fresh {
456 debug!("optimizer pass {} did not make changes", i);
458 break;
459 }
460 i += 1;
461 }
462
463 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
465 e.context("Check optimizer-specific invariants after all passes")
466 })?;
467
468 new_plan
470 .check_invariants(InvariantLevel::Executable)
471 .map_err(|e| {
472 e.context("Invalid (non-executable) plan after LP Optimizers")
473 })?;
474
475 log_plan("Final optimized plan", &new_plan);
476 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
477 Ok(new_plan)
478 }
479}
480
481fn assert_valid_optimization(
486 plan: &LogicalPlan,
487 prev_schema: &Arc<DFSchema>,
488) -> Result<()> {
489 assert_expected_schema(prev_schema, plan)?;
492
493 Ok(())
494}
495
496#[cfg(test)]
497mod tests {
498 use std::sync::{Arc, Mutex};
499
500 use datafusion_common::tree_node::Transformed;
501 use datafusion_common::{plan_err, DFSchema, DFSchemaRef, DataFusionError, Result};
502 use datafusion_expr::logical_plan::EmptyRelation;
503 use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
504
505 use crate::optimizer::Optimizer;
506 use crate::test::test_table_scan;
507 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
508
509 use super::ApplyOrder;
510
511 #[test]
512 fn skip_failing_rule() {
513 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
514 let config = OptimizerContext::new().with_skip_failing_rules(true);
515 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
516 produce_one_row: false,
517 schema: Arc::new(DFSchema::empty()),
518 });
519 opt.optimize(plan, &config, &observe).unwrap();
520 }
521
522 #[test]
523 fn no_skip_failing_rule() {
524 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
525 let config = OptimizerContext::new().with_skip_failing_rules(false);
526 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
527 produce_one_row: false,
528 schema: Arc::new(DFSchema::empty()),
529 });
530 let err = opt.optimize(plan, &config, &observe).unwrap_err();
531 assert_eq!(
532 "Optimizer rule 'bad rule' failed\ncaused by\n\
533 Error during planning: rule failed",
534 err.strip_backtrace()
535 );
536 }
537
538 #[test]
539 fn generate_different_schema() {
540 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
541 let config = OptimizerContext::new().with_skip_failing_rules(false);
542 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
543 produce_one_row: false,
544 schema: Arc::new(DFSchema::empty()),
545 });
546 let err = opt.optimize(plan, &config, &observe).unwrap_err();
547 assert!(err.strip_backtrace().starts_with(
548 "Optimizer rule 'get table_scan rule' failed\n\
549 caused by\n\
550 Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\
551 caused by\n\
552 Internal error: Failed due to a difference in schemas, \
553 original schema: DFSchema { inner: Schema { \
554 fields: [], \
555 metadata: {} }, \
556 field_qualifiers: [], \
557 functional_dependencies: FunctionalDependencies { deps: [] } \
558 }, \
559 new schema: DFSchema { inner: Schema { \
560 fields: [\
561 Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
562 Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
563 Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }\
564 ], \
565 metadata: {} }, \
566 field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \
567 functional_dependencies: FunctionalDependencies { deps: [] } }",
568 ));
569 }
570
571 #[test]
572 fn skip_generate_different_schema() {
573 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
574 let config = OptimizerContext::new().with_skip_failing_rules(true);
575 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
576 produce_one_row: false,
577 schema: Arc::new(DFSchema::empty()),
578 });
579 opt.optimize(plan, &config, &observe).unwrap();
580 }
581
582 #[test]
583 fn generate_same_schema_different_metadata() -> Result<()> {
584 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
587 let config = OptimizerContext::new().with_skip_failing_rules(false);
588
589 let input = Arc::new(test_table_scan()?);
590 let input_schema = Arc::clone(input.schema());
591
592 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
593 vec![col("a"), col("b"), col("c")],
594 input,
595 add_metadata_to_fields(input_schema.as_ref()),
596 )?);
597
598 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
600 let optimized_plan = opt.optimize(plan, &config, &observe)?;
601 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
603 Ok(())
604 }
605
606 #[test]
607 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
608 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
612 let config = OptimizerContext::new().with_max_passes(16);
613
614 let initial_plan = LogicalPlanBuilder::empty(false)
615 .project([lit(1), lit(2), lit(3)])?
616 .project([lit(100)])? .build()?;
618
619 let mut plans: Vec<LogicalPlan> = Vec::new();
620 let final_plan =
621 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
622
623 assert_eq!(3, plans.len());
625
626 assert_eq!(initial_plan, final_plan);
628
629 Ok(())
630 }
631
632 #[test]
633 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
634 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
638 let config = OptimizerContext::new().with_max_passes(16);
639
640 let initial_plan = LogicalPlanBuilder::empty(false)
641 .project([lit(1), lit(2), lit(3)])?
642 .project([lit(100)])? .build()?;
644
645 let mut plans: Vec<LogicalPlan> = Vec::new();
646 let final_plan =
647 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
648
649 assert_eq!(4, plans.len());
651
652 assert_eq!(plans[0], final_plan);
654
655 Ok(())
656 }
657
658 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
659 let new_fields = schema
660 .iter()
661 .enumerate()
662 .map(|(i, (qualifier, field))| {
663 let metadata =
664 [("key".into(), format!("value {i}"))].into_iter().collect();
665
666 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
667 (qualifier.cloned(), Arc::new(new_arrow_field))
668 })
669 .collect::<Vec<_>>();
670
671 let new_metadata = schema.metadata().clone();
672 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
673 }
674
675 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
676
677 #[derive(Default, Debug)]
678 struct BadRule {}
679
680 impl OptimizerRule for BadRule {
681 fn name(&self) -> &str {
682 "bad rule"
683 }
684
685 fn supports_rewrite(&self) -> bool {
686 true
687 }
688
689 fn rewrite(
690 &self,
691 _plan: LogicalPlan,
692 _config: &dyn OptimizerConfig,
693 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
694 plan_err!("rule failed")
695 }
696 }
697
698 #[derive(Default, Debug)]
700 struct GetTableScanRule {}
701
702 impl OptimizerRule for GetTableScanRule {
703 fn name(&self) -> &str {
704 "get table_scan rule"
705 }
706
707 fn supports_rewrite(&self) -> bool {
708 true
709 }
710
711 fn rewrite(
712 &self,
713 _plan: LogicalPlan,
714 _config: &dyn OptimizerConfig,
715 ) -> Result<Transformed<LogicalPlan>> {
716 let table_scan = test_table_scan()?;
717 Ok(Transformed::yes(
718 LogicalPlanBuilder::from(table_scan).build()?,
719 ))
720 }
721 }
722
723 #[derive(Default, Debug)]
727 struct RotateProjectionRule {
728 reverse_on_first_pass: Mutex<bool>,
730 }
731
732 impl RotateProjectionRule {
733 fn new(reverse_on_first_pass: bool) -> Self {
734 Self {
735 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
736 }
737 }
738 }
739
740 impl OptimizerRule for RotateProjectionRule {
741 fn name(&self) -> &str {
742 "rotate_projection"
743 }
744
745 fn apply_order(&self) -> Option<ApplyOrder> {
746 Some(ApplyOrder::TopDown)
747 }
748
749 fn supports_rewrite(&self) -> bool {
750 true
751 }
752
753 fn rewrite(
754 &self,
755 plan: LogicalPlan,
756 _config: &dyn OptimizerConfig,
757 ) -> Result<Transformed<LogicalPlan>> {
758 let projection = match plan {
759 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
760 _ => return Ok(Transformed::no(plan)),
761 };
762
763 let mut exprs = projection.expr.clone();
764
765 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
766 if *reverse {
767 exprs.reverse();
768 *reverse = false;
769 } else {
770 exprs.rotate_left(1);
771 }
772
773 Ok(Transformed::yes(LogicalPlan::Projection(
774 Projection::try_new(exprs, Arc::clone(&projection.input))?,
775 )))
776 }
777 }
778}