1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_nested_union::EliminateNestedUnion;
45use crate::eliminate_one_union::EliminateOneUnion;
46use crate::eliminate_outer_join::EliminateOuterJoin;
47use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
48use crate::filter_null_join_keys::FilterNullJoinKeys;
49use crate::optimize_projections::OptimizeProjections;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
56use crate::simplify_expressions::SimplifyExpressions;
57use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
58use crate::utils::log_plan;
59
60pub trait OptimizerRule: Debug {
73 fn name(&self) -> &str;
75
76 fn apply_order(&self) -> Option<ApplyOrder> {
81 None
82 }
83
84 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
86 fn supports_rewrite(&self) -> bool {
87 true
88 }
89
90 fn rewrite(
93 &self,
94 _plan: LogicalPlan,
95 _config: &dyn OptimizerConfig,
96 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
97 internal_err!("rewrite is not implemented for {}", self.name())
98 }
99}
100
101pub trait OptimizerConfig {
103 fn query_execution_start_time(&self) -> DateTime<Utc>;
106
107 fn alias_generator(&self) -> &Arc<AliasGenerator>;
109
110 fn options(&self) -> Arc<ConfigOptions>;
111
112 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
113 None
114 }
115}
116
117#[derive(Debug)]
120pub struct OptimizerContext {
121 query_execution_start_time: DateTime<Utc>,
124
125 alias_generator: Arc<AliasGenerator>,
127
128 options: Arc<ConfigOptions>,
129}
130
131impl OptimizerContext {
132 pub fn new() -> Self {
134 let mut options = ConfigOptions::default();
135 options.optimizer.filter_null_join_keys = true;
136
137 Self::new_with_config_options(Arc::new(options))
138 }
139
140 pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
142 Self {
143 query_execution_start_time: Utc::now(),
144 alias_generator: Arc::new(AliasGenerator::new()),
145 options,
146 }
147 }
148
149 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
151 Arc::make_mut(&mut self.options)
152 .optimizer
153 .filter_null_join_keys = filter_null_keys;
154 self
155 }
156
157 pub fn with_query_execution_start_time(
160 mut self,
161 query_execution_tart_time: DateTime<Utc>,
162 ) -> Self {
163 self.query_execution_start_time = query_execution_tart_time;
164 self
165 }
166
167 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
170 Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
171 self
172 }
173
174 pub fn with_max_passes(mut self, v: u8) -> Self {
176 Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
177 self
178 }
179}
180
181impl Default for OptimizerContext {
182 fn default() -> Self {
184 Self::new()
185 }
186}
187
188impl OptimizerConfig for OptimizerContext {
189 fn query_execution_start_time(&self) -> DateTime<Utc> {
190 self.query_execution_start_time
191 }
192
193 fn alias_generator(&self) -> &Arc<AliasGenerator> {
194 &self.alias_generator
195 }
196
197 fn options(&self) -> Arc<ConfigOptions> {
198 Arc::clone(&self.options)
199 }
200}
201
202#[derive(Clone, Debug)]
204pub struct Optimizer {
205 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
207}
208
209#[derive(Debug, Clone, Copy, PartialEq)]
214pub enum ApplyOrder {
215 TopDown,
217 BottomUp,
219}
220
221impl Default for Optimizer {
222 fn default() -> Self {
223 Self::new()
224 }
225}
226
227impl Optimizer {
228 pub fn new() -> Self {
230 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
231 Arc::new(EliminateNestedUnion::new()),
232 Arc::new(SimplifyExpressions::new()),
233 Arc::new(ReplaceDistinctWithAggregate::new()),
234 Arc::new(EliminateJoin::new()),
235 Arc::new(DecorrelatePredicateSubquery::new()),
236 Arc::new(ScalarSubqueryToJoin::new()),
237 Arc::new(DecorrelateLateralJoin::new()),
238 Arc::new(ExtractEquijoinPredicate::new()),
239 Arc::new(EliminateDuplicatedExpr::new()),
240 Arc::new(EliminateFilter::new()),
241 Arc::new(EliminateCrossJoin::new()),
242 Arc::new(EliminateLimit::new()),
243 Arc::new(PropagateEmptyRelation::new()),
244 Arc::new(EliminateOneUnion::new()),
246 Arc::new(FilterNullJoinKeys::default()),
247 Arc::new(EliminateOuterJoin::new()),
248 Arc::new(PushDownLimit::new()),
250 Arc::new(PushDownFilter::new()),
251 Arc::new(SingleDistinctToGroupBy::new()),
252 Arc::new(EliminateGroupByConstant::new()),
255 Arc::new(CommonSubexprEliminate::new()),
256 Arc::new(OptimizeProjections::new()),
257 ];
258
259 Self::with_rules(rules)
260 }
261
262 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
264 Self { rules }
265 }
266}
267
268struct Rewriter<'a> {
270 apply_order: ApplyOrder,
271 rule: &'a dyn OptimizerRule,
272 config: &'a dyn OptimizerConfig,
273}
274
275impl<'a> Rewriter<'a> {
276 fn new(
277 apply_order: ApplyOrder,
278 rule: &'a dyn OptimizerRule,
279 config: &'a dyn OptimizerConfig,
280 ) -> Self {
281 Self {
282 apply_order,
283 rule,
284 config,
285 }
286 }
287}
288
289impl TreeNodeRewriter for Rewriter<'_> {
290 type Node = LogicalPlan;
291
292 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
293 if self.apply_order == ApplyOrder::TopDown {
294 {
295 self.rule.rewrite(node, self.config)
296 }
297 } else {
298 Ok(Transformed::no(node))
299 }
300 }
301
302 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
303 if self.apply_order == ApplyOrder::BottomUp {
304 {
305 self.rule.rewrite(node, self.config)
306 }
307 } else {
308 Ok(Transformed::no(node))
309 }
310 }
311}
312
313impl Optimizer {
314 pub fn optimize<F>(
317 &self,
318 plan: LogicalPlan,
319 config: &dyn OptimizerConfig,
320 mut observer: F,
321 ) -> Result<LogicalPlan>
322 where
323 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
324 {
325 plan.check_invariants(InvariantLevel::Executable)
327 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
328
329 let start_time = Instant::now();
330 let options = config.options();
331 let mut new_plan = plan;
332
333 let mut previous_plans = HashSet::with_capacity(16);
334 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
335
336 let starting_schema = Arc::clone(new_plan.schema());
337
338 let mut i = 0;
339 while i < options.optimizer.max_passes {
340 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
341
342 for rule in &self.rules {
343 let prev_plan = options
346 .optimizer
347 .skip_failed_rules
348 .then(|| new_plan.clone());
349
350 let starting_schema = Arc::clone(new_plan.schema());
351
352 let result = match rule.apply_order() {
353 Some(apply_order) => new_plan.rewrite_with_subqueries(
355 &mut Rewriter::new(apply_order, rule.as_ref(), config),
356 ),
357 None => {
359 rule.rewrite(new_plan, config)
360 },
361 }
362 .and_then(|tnr| {
363 assert_valid_optimization(&tnr.data, &starting_schema)
365 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
366
367 #[cfg(debug_assertions)]
369 tnr.data.check_invariants(InvariantLevel::Executable)
370 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
371
372 Ok(tnr)
373 });
374
375 match (result, prev_plan) {
377 (
379 Ok(Transformed {
380 data, transformed, ..
381 }),
382 _,
383 ) => {
384 new_plan = data;
385 observer(&new_plan, rule.as_ref());
386 if transformed {
387 log_plan(rule.name(), &new_plan);
388 } else {
389 debug!(
390 "Plan unchanged by optimizer rule '{}' (pass {})",
391 rule.name(),
392 i
393 );
394 }
395 }
396 (Err(e), Some(orig_plan)) => {
399 warn!(
403 "Skipping optimizer rule '{}' due to unexpected error: {}",
404 rule.name(),
405 e
406 );
407 new_plan = orig_plan;
408 }
409 (Err(e), None) => {
411 return Err(e.context(format!(
412 "Optimizer rule '{}' failed",
413 rule.name()
414 )));
415 }
416 }
417 }
418 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
419
420 let plan_is_fresh =
422 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
423 if !plan_is_fresh {
424 debug!("optimizer pass {i} did not make changes");
426 break;
427 }
428 i += 1;
429 }
430
431 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
433 e.context("Check optimizer-specific invariants after all passes")
434 })?;
435
436 new_plan
438 .check_invariants(InvariantLevel::Executable)
439 .map_err(|e| {
440 e.context("Invalid (non-executable) plan after LP Optimizers")
441 })?;
442
443 log_plan("Final optimized plan", &new_plan);
444 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
445 Ok(new_plan)
446 }
447}
448
449fn assert_valid_optimization(
454 plan: &LogicalPlan,
455 prev_schema: &Arc<DFSchema>,
456) -> Result<()> {
457 assert_expected_schema(prev_schema, plan)?;
460
461 Ok(())
462}
463
464#[cfg(test)]
465mod tests {
466 use std::sync::{Arc, Mutex};
467
468 use datafusion_common::tree_node::Transformed;
469 use datafusion_common::{
470 assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
471 };
472 use datafusion_expr::logical_plan::EmptyRelation;
473 use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
474
475 use crate::optimizer::Optimizer;
476 use crate::test::test_table_scan;
477 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
478
479 use super::ApplyOrder;
480
481 #[test]
482 fn skip_failing_rule() {
483 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
484 let config = OptimizerContext::new().with_skip_failing_rules(true);
485 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
486 produce_one_row: false,
487 schema: Arc::new(DFSchema::empty()),
488 });
489 opt.optimize(plan, &config, &observe).unwrap();
490 }
491
492 #[test]
493 fn no_skip_failing_rule() {
494 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
495 let config = OptimizerContext::new().with_skip_failing_rules(false);
496 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
497 produce_one_row: false,
498 schema: Arc::new(DFSchema::empty()),
499 });
500 let err = opt.optimize(plan, &config, &observe).unwrap_err();
501 assert_eq!(
502 "Optimizer rule 'bad rule' failed\ncaused by\n\
503 Error during planning: rule failed",
504 err.strip_backtrace()
505 );
506 }
507
508 #[test]
509 fn generate_different_schema() {
510 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
511 let config = OptimizerContext::new().with_skip_failing_rules(false);
512 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
513 produce_one_row: false,
514 schema: Arc::new(DFSchema::empty()),
515 });
516 let err = opt.optimize(plan, &config, &observe).unwrap_err();
517
518 assert_contains!(
520 err.strip_backtrace(),
521 "Failed due to a difference in schemas: original schema: DFSchema"
522 );
523 }
524
525 #[test]
526 fn skip_generate_different_schema() {
527 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
528 let config = OptimizerContext::new().with_skip_failing_rules(true);
529 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
530 produce_one_row: false,
531 schema: Arc::new(DFSchema::empty()),
532 });
533 opt.optimize(plan, &config, &observe).unwrap();
534 }
535
536 #[test]
537 fn generate_same_schema_different_metadata() -> Result<()> {
538 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
541 let config = OptimizerContext::new().with_skip_failing_rules(false);
542
543 let input = Arc::new(test_table_scan()?);
544 let input_schema = Arc::clone(input.schema());
545
546 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
547 vec![col("a"), col("b"), col("c")],
548 input,
549 add_metadata_to_fields(input_schema.as_ref()),
550 )?);
551
552 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
554 let optimized_plan = opt.optimize(plan, &config, &observe)?;
555 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
557 Ok(())
558 }
559
560 #[test]
561 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
562 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
566 let config = OptimizerContext::new().with_max_passes(16);
567
568 let initial_plan = LogicalPlanBuilder::empty(false)
569 .project([lit(1), lit(2), lit(3)])?
570 .project([lit(100)])? .build()?;
572
573 let mut plans: Vec<LogicalPlan> = Vec::new();
574 let final_plan =
575 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
576
577 assert_eq!(3, plans.len());
579
580 assert_eq!(initial_plan, final_plan);
582
583 Ok(())
584 }
585
586 #[test]
587 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
588 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
592 let config = OptimizerContext::new().with_max_passes(16);
593
594 let initial_plan = LogicalPlanBuilder::empty(false)
595 .project([lit(1), lit(2), lit(3)])?
596 .project([lit(100)])? .build()?;
598
599 let mut plans: Vec<LogicalPlan> = Vec::new();
600 let final_plan =
601 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
602
603 assert_eq!(4, plans.len());
605
606 assert_eq!(plans[0], final_plan);
608
609 Ok(())
610 }
611
612 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
613 let new_fields = schema
614 .iter()
615 .enumerate()
616 .map(|(i, (qualifier, field))| {
617 let metadata =
618 [("key".into(), format!("value {i}"))].into_iter().collect();
619
620 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
621 (qualifier.cloned(), Arc::new(new_arrow_field))
622 })
623 .collect::<Vec<_>>();
624
625 let new_metadata = schema.metadata().clone();
626 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
627 }
628
629 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
630
631 #[derive(Default, Debug)]
632 struct BadRule {}
633
634 impl OptimizerRule for BadRule {
635 fn name(&self) -> &str {
636 "bad rule"
637 }
638
639 fn supports_rewrite(&self) -> bool {
640 true
641 }
642
643 fn rewrite(
644 &self,
645 _plan: LogicalPlan,
646 _config: &dyn OptimizerConfig,
647 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
648 plan_err!("rule failed")
649 }
650 }
651
652 #[derive(Default, Debug)]
654 struct GetTableScanRule {}
655
656 impl OptimizerRule for GetTableScanRule {
657 fn name(&self) -> &str {
658 "get table_scan rule"
659 }
660
661 fn supports_rewrite(&self) -> bool {
662 true
663 }
664
665 fn rewrite(
666 &self,
667 _plan: LogicalPlan,
668 _config: &dyn OptimizerConfig,
669 ) -> Result<Transformed<LogicalPlan>> {
670 let table_scan = test_table_scan()?;
671 Ok(Transformed::yes(
672 LogicalPlanBuilder::from(table_scan).build()?,
673 ))
674 }
675 }
676
677 #[derive(Default, Debug)]
681 struct RotateProjectionRule {
682 reverse_on_first_pass: Mutex<bool>,
684 }
685
686 impl RotateProjectionRule {
687 fn new(reverse_on_first_pass: bool) -> Self {
688 Self {
689 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
690 }
691 }
692 }
693
694 impl OptimizerRule for RotateProjectionRule {
695 fn name(&self) -> &str {
696 "rotate_projection"
697 }
698
699 fn apply_order(&self) -> Option<ApplyOrder> {
700 Some(ApplyOrder::TopDown)
701 }
702
703 fn supports_rewrite(&self) -> bool {
704 true
705 }
706
707 fn rewrite(
708 &self,
709 plan: LogicalPlan,
710 _config: &dyn OptimizerConfig,
711 ) -> Result<Transformed<LogicalPlan>> {
712 let projection = match plan {
713 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
714 _ => return Ok(Transformed::no(plan)),
715 };
716
717 let mut exprs = projection.expr.clone();
718
719 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
720 if *reverse {
721 exprs.reverse();
722 *reverse = false;
723 } else {
724 exprs.rotate_left(1);
725 }
726
727 Ok(Transformed::yes(LogicalPlan::Projection(
728 Projection::try_new(exprs, Arc::clone(&projection.input))?,
729 )))
730 }
731 }
732}