1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_nested_union::EliminateNestedUnion;
45use crate::eliminate_one_union::EliminateOneUnion;
46use crate::eliminate_outer_join::EliminateOuterJoin;
47use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
48use crate::filter_null_join_keys::FilterNullJoinKeys;
49use crate::optimize_projections::OptimizeProjections;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
56use crate::simplify_expressions::SimplifyExpressions;
57use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
58use crate::utils::log_plan;
59
60pub trait OptimizerRule: Debug {
73 fn name(&self) -> &str;
75
76 fn apply_order(&self) -> Option<ApplyOrder> {
81 None
82 }
83
84 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
86 fn supports_rewrite(&self) -> bool {
87 true
88 }
89
90 fn rewrite(
93 &self,
94 _plan: LogicalPlan,
95 _config: &dyn OptimizerConfig,
96 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
97 internal_err!("rewrite is not implemented for {}", self.name())
98 }
99}
100
101pub trait OptimizerConfig {
103 fn query_execution_start_time(&self) -> DateTime<Utc>;
106
107 fn alias_generator(&self) -> &Arc<AliasGenerator>;
109
110 fn options(&self) -> Arc<ConfigOptions>;
111
112 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
113 None
114 }
115}
116
117#[derive(Debug)]
120pub struct OptimizerContext {
121 query_execution_start_time: DateTime<Utc>,
124
125 alias_generator: Arc<AliasGenerator>,
127
128 options: Arc<ConfigOptions>,
129}
130
131impl OptimizerContext {
132 pub fn new() -> Self {
134 let mut options = ConfigOptions::default();
135 options.optimizer.filter_null_join_keys = true;
136
137 Self {
138 query_execution_start_time: Utc::now(),
139 alias_generator: Arc::new(AliasGenerator::new()),
140 options: Arc::new(options),
141 }
142 }
143
144 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
146 Arc::make_mut(&mut self.options)
147 .optimizer
148 .filter_null_join_keys = filter_null_keys;
149 self
150 }
151
152 pub fn with_query_execution_start_time(
155 mut self,
156 query_execution_tart_time: DateTime<Utc>,
157 ) -> Self {
158 self.query_execution_start_time = query_execution_tart_time;
159 self
160 }
161
162 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
165 Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
166 self
167 }
168
169 pub fn with_max_passes(mut self, v: u8) -> Self {
171 Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
172 self
173 }
174}
175
176impl Default for OptimizerContext {
177 fn default() -> Self {
179 Self::new()
180 }
181}
182
183impl OptimizerConfig for OptimizerContext {
184 fn query_execution_start_time(&self) -> DateTime<Utc> {
185 self.query_execution_start_time
186 }
187
188 fn alias_generator(&self) -> &Arc<AliasGenerator> {
189 &self.alias_generator
190 }
191
192 fn options(&self) -> Arc<ConfigOptions> {
193 Arc::clone(&self.options)
194 }
195}
196
197#[derive(Clone, Debug)]
199pub struct Optimizer {
200 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq)]
209pub enum ApplyOrder {
210 TopDown,
212 BottomUp,
214}
215
216impl Default for Optimizer {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222impl Optimizer {
223 pub fn new() -> Self {
225 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
226 Arc::new(EliminateNestedUnion::new()),
227 Arc::new(SimplifyExpressions::new()),
228 Arc::new(ReplaceDistinctWithAggregate::new()),
229 Arc::new(EliminateJoin::new()),
230 Arc::new(DecorrelatePredicateSubquery::new()),
231 Arc::new(ScalarSubqueryToJoin::new()),
232 Arc::new(DecorrelateLateralJoin::new()),
233 Arc::new(ExtractEquijoinPredicate::new()),
234 Arc::new(EliminateDuplicatedExpr::new()),
235 Arc::new(EliminateFilter::new()),
236 Arc::new(EliminateCrossJoin::new()),
237 Arc::new(EliminateLimit::new()),
238 Arc::new(PropagateEmptyRelation::new()),
239 Arc::new(EliminateOneUnion::new()),
241 Arc::new(FilterNullJoinKeys::default()),
242 Arc::new(EliminateOuterJoin::new()),
243 Arc::new(PushDownLimit::new()),
245 Arc::new(PushDownFilter::new()),
246 Arc::new(SingleDistinctToGroupBy::new()),
247 Arc::new(EliminateGroupByConstant::new()),
250 Arc::new(CommonSubexprEliminate::new()),
251 Arc::new(OptimizeProjections::new()),
252 ];
253
254 Self::with_rules(rules)
255 }
256
257 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
259 Self { rules }
260 }
261}
262
263struct Rewriter<'a> {
265 apply_order: ApplyOrder,
266 rule: &'a dyn OptimizerRule,
267 config: &'a dyn OptimizerConfig,
268}
269
270impl<'a> Rewriter<'a> {
271 fn new(
272 apply_order: ApplyOrder,
273 rule: &'a dyn OptimizerRule,
274 config: &'a dyn OptimizerConfig,
275 ) -> Self {
276 Self {
277 apply_order,
278 rule,
279 config,
280 }
281 }
282}
283
284impl TreeNodeRewriter for Rewriter<'_> {
285 type Node = LogicalPlan;
286
287 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
288 if self.apply_order == ApplyOrder::TopDown {
289 {
290 self.rule.rewrite(node, self.config)
291 }
292 } else {
293 Ok(Transformed::no(node))
294 }
295 }
296
297 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
298 if self.apply_order == ApplyOrder::BottomUp {
299 {
300 self.rule.rewrite(node, self.config)
301 }
302 } else {
303 Ok(Transformed::no(node))
304 }
305 }
306}
307
308impl Optimizer {
309 pub fn optimize<F>(
312 &self,
313 plan: LogicalPlan,
314 config: &dyn OptimizerConfig,
315 mut observer: F,
316 ) -> Result<LogicalPlan>
317 where
318 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
319 {
320 plan.check_invariants(InvariantLevel::Executable)
322 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
323
324 let start_time = Instant::now();
325 let options = config.options();
326 let mut new_plan = plan;
327
328 let mut previous_plans = HashSet::with_capacity(16);
329 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
330
331 let starting_schema = Arc::clone(new_plan.schema());
332
333 let mut i = 0;
334 while i < options.optimizer.max_passes {
335 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
336
337 for rule in &self.rules {
338 let prev_plan = options
341 .optimizer
342 .skip_failed_rules
343 .then(|| new_plan.clone());
344
345 let starting_schema = Arc::clone(new_plan.schema());
346
347 let result = match rule.apply_order() {
348 Some(apply_order) => new_plan.rewrite_with_subqueries(
350 &mut Rewriter::new(apply_order, rule.as_ref(), config),
351 ),
352 None => {
354 rule.rewrite(new_plan, config)
355 },
356 }
357 .and_then(|tnr| {
358 assert_valid_optimization(&tnr.data, &starting_schema)
360 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
361
362 #[cfg(debug_assertions)]
364 tnr.data.check_invariants(InvariantLevel::Executable)
365 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
366
367 Ok(tnr)
368 });
369
370 match (result, prev_plan) {
372 (
374 Ok(Transformed {
375 data, transformed, ..
376 }),
377 _,
378 ) => {
379 new_plan = data;
380 observer(&new_plan, rule.as_ref());
381 if transformed {
382 log_plan(rule.name(), &new_plan);
383 } else {
384 debug!(
385 "Plan unchanged by optimizer rule '{}' (pass {})",
386 rule.name(),
387 i
388 );
389 }
390 }
391 (Err(e), Some(orig_plan)) => {
394 warn!(
398 "Skipping optimizer rule '{}' due to unexpected error: {}",
399 rule.name(),
400 e
401 );
402 new_plan = orig_plan;
403 }
404 (Err(e), None) => {
406 return Err(e.context(format!(
407 "Optimizer rule '{}' failed",
408 rule.name()
409 )));
410 }
411 }
412 }
413 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
414
415 let plan_is_fresh =
417 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
418 if !plan_is_fresh {
419 debug!("optimizer pass {i} did not make changes");
421 break;
422 }
423 i += 1;
424 }
425
426 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
428 e.context("Check optimizer-specific invariants after all passes")
429 })?;
430
431 new_plan
433 .check_invariants(InvariantLevel::Executable)
434 .map_err(|e| {
435 e.context("Invalid (non-executable) plan after LP Optimizers")
436 })?;
437
438 log_plan("Final optimized plan", &new_plan);
439 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
440 Ok(new_plan)
441 }
442}
443
444fn assert_valid_optimization(
449 plan: &LogicalPlan,
450 prev_schema: &Arc<DFSchema>,
451) -> Result<()> {
452 assert_expected_schema(prev_schema, plan)?;
455
456 Ok(())
457}
458
459#[cfg(test)]
460mod tests {
461 use std::sync::{Arc, Mutex};
462
463 use datafusion_common::tree_node::Transformed;
464 use datafusion_common::{
465 assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
466 };
467 use datafusion_expr::logical_plan::EmptyRelation;
468 use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
469
470 use crate::optimizer::Optimizer;
471 use crate::test::test_table_scan;
472 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
473
474 use super::ApplyOrder;
475
476 #[test]
477 fn skip_failing_rule() {
478 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
479 let config = OptimizerContext::new().with_skip_failing_rules(true);
480 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
481 produce_one_row: false,
482 schema: Arc::new(DFSchema::empty()),
483 });
484 opt.optimize(plan, &config, &observe).unwrap();
485 }
486
487 #[test]
488 fn no_skip_failing_rule() {
489 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
490 let config = OptimizerContext::new().with_skip_failing_rules(false);
491 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
492 produce_one_row: false,
493 schema: Arc::new(DFSchema::empty()),
494 });
495 let err = opt.optimize(plan, &config, &observe).unwrap_err();
496 assert_eq!(
497 "Optimizer rule 'bad rule' failed\ncaused by\n\
498 Error during planning: rule failed",
499 err.strip_backtrace()
500 );
501 }
502
503 #[test]
504 fn generate_different_schema() {
505 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
506 let config = OptimizerContext::new().with_skip_failing_rules(false);
507 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
508 produce_one_row: false,
509 schema: Arc::new(DFSchema::empty()),
510 });
511 let err = opt.optimize(plan, &config, &observe).unwrap_err();
512
513 assert_contains!(
515 err.strip_backtrace(),
516 "Failed due to a difference in schemas: original schema: DFSchema"
517 );
518 }
519
520 #[test]
521 fn skip_generate_different_schema() {
522 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
523 let config = OptimizerContext::new().with_skip_failing_rules(true);
524 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
525 produce_one_row: false,
526 schema: Arc::new(DFSchema::empty()),
527 });
528 opt.optimize(plan, &config, &observe).unwrap();
529 }
530
531 #[test]
532 fn generate_same_schema_different_metadata() -> Result<()> {
533 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
536 let config = OptimizerContext::new().with_skip_failing_rules(false);
537
538 let input = Arc::new(test_table_scan()?);
539 let input_schema = Arc::clone(input.schema());
540
541 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
542 vec![col("a"), col("b"), col("c")],
543 input,
544 add_metadata_to_fields(input_schema.as_ref()),
545 )?);
546
547 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
549 let optimized_plan = opt.optimize(plan, &config, &observe)?;
550 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
552 Ok(())
553 }
554
555 #[test]
556 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
557 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
561 let config = OptimizerContext::new().with_max_passes(16);
562
563 let initial_plan = LogicalPlanBuilder::empty(false)
564 .project([lit(1), lit(2), lit(3)])?
565 .project([lit(100)])? .build()?;
567
568 let mut plans: Vec<LogicalPlan> = Vec::new();
569 let final_plan =
570 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
571
572 assert_eq!(3, plans.len());
574
575 assert_eq!(initial_plan, final_plan);
577
578 Ok(())
579 }
580
581 #[test]
582 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
583 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
587 let config = OptimizerContext::new().with_max_passes(16);
588
589 let initial_plan = LogicalPlanBuilder::empty(false)
590 .project([lit(1), lit(2), lit(3)])?
591 .project([lit(100)])? .build()?;
593
594 let mut plans: Vec<LogicalPlan> = Vec::new();
595 let final_plan =
596 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
597
598 assert_eq!(4, plans.len());
600
601 assert_eq!(plans[0], final_plan);
603
604 Ok(())
605 }
606
607 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
608 let new_fields = schema
609 .iter()
610 .enumerate()
611 .map(|(i, (qualifier, field))| {
612 let metadata =
613 [("key".into(), format!("value {i}"))].into_iter().collect();
614
615 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
616 (qualifier.cloned(), Arc::new(new_arrow_field))
617 })
618 .collect::<Vec<_>>();
619
620 let new_metadata = schema.metadata().clone();
621 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
622 }
623
624 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
625
626 #[derive(Default, Debug)]
627 struct BadRule {}
628
629 impl OptimizerRule for BadRule {
630 fn name(&self) -> &str {
631 "bad rule"
632 }
633
634 fn supports_rewrite(&self) -> bool {
635 true
636 }
637
638 fn rewrite(
639 &self,
640 _plan: LogicalPlan,
641 _config: &dyn OptimizerConfig,
642 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
643 plan_err!("rule failed")
644 }
645 }
646
647 #[derive(Default, Debug)]
649 struct GetTableScanRule {}
650
651 impl OptimizerRule for GetTableScanRule {
652 fn name(&self) -> &str {
653 "get table_scan rule"
654 }
655
656 fn supports_rewrite(&self) -> bool {
657 true
658 }
659
660 fn rewrite(
661 &self,
662 _plan: LogicalPlan,
663 _config: &dyn OptimizerConfig,
664 ) -> Result<Transformed<LogicalPlan>> {
665 let table_scan = test_table_scan()?;
666 Ok(Transformed::yes(
667 LogicalPlanBuilder::from(table_scan).build()?,
668 ))
669 }
670 }
671
672 #[derive(Default, Debug)]
676 struct RotateProjectionRule {
677 reverse_on_first_pass: Mutex<bool>,
679 }
680
681 impl RotateProjectionRule {
682 fn new(reverse_on_first_pass: bool) -> Self {
683 Self {
684 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
685 }
686 }
687 }
688
689 impl OptimizerRule for RotateProjectionRule {
690 fn name(&self) -> &str {
691 "rotate_projection"
692 }
693
694 fn apply_order(&self) -> Option<ApplyOrder> {
695 Some(ApplyOrder::TopDown)
696 }
697
698 fn supports_rewrite(&self) -> bool {
699 true
700 }
701
702 fn rewrite(
703 &self,
704 plan: LogicalPlan,
705 _config: &dyn OptimizerConfig,
706 ) -> Result<Transformed<LogicalPlan>> {
707 let projection = match plan {
708 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
709 _ => return Ok(Transformed::no(plan)),
710 };
711
712 let mut exprs = projection.expr.clone();
713
714 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
715 if *reverse {
716 exprs.reverse();
717 *reverse = false;
718 } else {
719 exprs.rotate_left(1);
720 }
721
722 Ok(Transformed::yes(LogicalPlan::Projection(
723 Projection::try_new(exprs, Arc::clone(&projection.input))?,
724 )))
725 }
726 }
727}