1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{InvariantLevel, assert_expected_schema};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_outer_join::EliminateOuterJoin;
45use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
46use crate::filter_null_join_keys::FilterNullJoinKeys;
47use crate::optimize_projections::OptimizeProjections;
48use crate::optimize_unions::OptimizeUnions;
49use crate::plan_signature::LogicalPlanSignature;
50use crate::propagate_empty_relation::PropagateEmptyRelation;
51use crate::push_down_filter::PushDownFilter;
52use crate::push_down_limit::PushDownLimit;
53use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
54use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
55use crate::simplify_expressions::SimplifyExpressions;
56use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
57use crate::utils::log_plan;
58
59pub trait OptimizerRule: Debug {
72 fn name(&self) -> &str;
74
75 fn apply_order(&self) -> Option<ApplyOrder> {
80 None
81 }
82
83 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
85 fn supports_rewrite(&self) -> bool {
86 true
87 }
88
89 fn rewrite(
92 &self,
93 _plan: LogicalPlan,
94 _config: &dyn OptimizerConfig,
95 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
96 internal_err!("rewrite is not implemented for {}", self.name())
97 }
98}
99
100pub trait OptimizerConfig {
102 fn query_execution_start_time(&self) -> DateTime<Utc>;
105
106 fn alias_generator(&self) -> &Arc<AliasGenerator>;
108
109 fn options(&self) -> Arc<ConfigOptions>;
110
111 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
112 None
113 }
114}
115
116#[derive(Debug)]
119pub struct OptimizerContext {
120 query_execution_start_time: DateTime<Utc>,
123
124 alias_generator: Arc<AliasGenerator>,
126
127 options: Arc<ConfigOptions>,
128}
129
130impl OptimizerContext {
131 pub fn new() -> Self {
133 let mut options = ConfigOptions::default();
134 options.optimizer.filter_null_join_keys = true;
135
136 Self::new_with_config_options(Arc::new(options))
137 }
138
139 pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
141 Self {
142 query_execution_start_time: Utc::now(),
143 alias_generator: Arc::new(AliasGenerator::new()),
144 options,
145 }
146 }
147
148 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
150 Arc::make_mut(&mut self.options)
151 .optimizer
152 .filter_null_join_keys = filter_null_keys;
153 self
154 }
155
156 pub fn with_query_execution_start_time(
159 mut self,
160 query_execution_tart_time: DateTime<Utc>,
161 ) -> Self {
162 self.query_execution_start_time = query_execution_tart_time;
163 self
164 }
165
166 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
169 Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
170 self
171 }
172
173 pub fn with_max_passes(mut self, v: u8) -> Self {
175 Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
176 self
177 }
178}
179
180impl Default for OptimizerContext {
181 fn default() -> Self {
183 Self::new()
184 }
185}
186
187impl OptimizerConfig for OptimizerContext {
188 fn query_execution_start_time(&self) -> DateTime<Utc> {
189 self.query_execution_start_time
190 }
191
192 fn alias_generator(&self) -> &Arc<AliasGenerator> {
193 &self.alias_generator
194 }
195
196 fn options(&self) -> Arc<ConfigOptions> {
197 Arc::clone(&self.options)
198 }
199}
200
201#[derive(Clone, Debug)]
203pub struct Optimizer {
204 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq)]
213pub enum ApplyOrder {
214 TopDown,
216 BottomUp,
218}
219
220impl Default for Optimizer {
221 fn default() -> Self {
222 Self::new()
223 }
224}
225
226impl Optimizer {
227 pub fn new() -> Self {
229 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
230 Arc::new(OptimizeUnions::new()),
231 Arc::new(SimplifyExpressions::new()),
232 Arc::new(ReplaceDistinctWithAggregate::new()),
233 Arc::new(EliminateJoin::new()),
234 Arc::new(DecorrelatePredicateSubquery::new()),
235 Arc::new(ScalarSubqueryToJoin::new()),
236 Arc::new(DecorrelateLateralJoin::new()),
237 Arc::new(ExtractEquijoinPredicate::new()),
238 Arc::new(EliminateDuplicatedExpr::new()),
239 Arc::new(EliminateFilter::new()),
240 Arc::new(EliminateCrossJoin::new()),
241 Arc::new(EliminateLimit::new()),
242 Arc::new(PropagateEmptyRelation::new()),
243 Arc::new(FilterNullJoinKeys::default()),
244 Arc::new(EliminateOuterJoin::new()),
245 Arc::new(PushDownLimit::new()),
247 Arc::new(PushDownFilter::new()),
248 Arc::new(SingleDistinctToGroupBy::new()),
249 Arc::new(EliminateGroupByConstant::new()),
252 Arc::new(CommonSubexprEliminate::new()),
253 Arc::new(OptimizeProjections::new()),
254 ];
255
256 Self::with_rules(rules)
257 }
258
259 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
261 Self { rules }
262 }
263}
264
265struct Rewriter<'a> {
267 apply_order: ApplyOrder,
268 rule: &'a dyn OptimizerRule,
269 config: &'a dyn OptimizerConfig,
270}
271
272impl<'a> Rewriter<'a> {
273 fn new(
274 apply_order: ApplyOrder,
275 rule: &'a dyn OptimizerRule,
276 config: &'a dyn OptimizerConfig,
277 ) -> Self {
278 Self {
279 apply_order,
280 rule,
281 config,
282 }
283 }
284}
285
286impl TreeNodeRewriter for Rewriter<'_> {
287 type Node = LogicalPlan;
288
289 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
290 if self.apply_order == ApplyOrder::TopDown {
291 self.rule.rewrite(node, self.config)
292 } else {
293 Ok(Transformed::no(node))
294 }
295 }
296
297 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
298 if self.apply_order == ApplyOrder::BottomUp {
299 self.rule.rewrite(node, self.config)
300 } else {
301 Ok(Transformed::no(node))
302 }
303 }
304}
305
306impl Optimizer {
307 pub fn optimize<F>(
310 &self,
311 plan: LogicalPlan,
312 config: &dyn OptimizerConfig,
313 mut observer: F,
314 ) -> Result<LogicalPlan>
315 where
316 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
317 {
318 plan.check_invariants(InvariantLevel::Executable)
320 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
321
322 let start_time = Instant::now();
323 let options = config.options();
324 let mut new_plan = plan;
325
326 let mut previous_plans = HashSet::with_capacity(16);
327 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
328
329 let starting_schema = Arc::clone(new_plan.schema());
330
331 let mut i = 0;
332 while i < options.optimizer.max_passes {
333 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
334
335 for rule in &self.rules {
336 let prev_plan = options
339 .optimizer
340 .skip_failed_rules
341 .then(|| new_plan.clone());
342
343 let starting_schema = Arc::clone(new_plan.schema());
344
345 let result = match rule.apply_order() {
346 Some(apply_order) => new_plan.rewrite_with_subqueries(
348 &mut Rewriter::new(apply_order, rule.as_ref(), config),
349 ),
350 None => {
352 rule.rewrite(new_plan, config)
353 },
354 }
355 .and_then(|tnr| {
356 assert_valid_optimization(&tnr.data, &starting_schema)
358 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
359
360 #[cfg(debug_assertions)]
362 tnr.data.check_invariants(InvariantLevel::Executable)
363 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
364
365 Ok(tnr)
366 });
367
368 match (result, prev_plan) {
370 (
372 Ok(Transformed {
373 data, transformed, ..
374 }),
375 _,
376 ) => {
377 new_plan = data;
378 observer(&new_plan, rule.as_ref());
379 if transformed {
380 log_plan(rule.name(), &new_plan);
381 } else {
382 debug!(
383 "Plan unchanged by optimizer rule '{}' (pass {})",
384 rule.name(),
385 i
386 );
387 }
388 }
389 (Err(e), Some(orig_plan)) => {
392 warn!(
396 "Skipping optimizer rule '{}' due to unexpected error: {}",
397 rule.name(),
398 e
399 );
400 new_plan = orig_plan;
401 }
402 (Err(e), None) => {
404 return Err(e.context(format!(
405 "Optimizer rule '{}' failed",
406 rule.name()
407 )));
408 }
409 }
410 }
411 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
412
413 let plan_is_fresh =
415 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
416 if !plan_is_fresh {
417 debug!("optimizer pass {i} did not make changes");
419 break;
420 }
421 i += 1;
422 }
423
424 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
426 e.context("Check optimizer-specific invariants after all passes")
427 })?;
428
429 new_plan
431 .check_invariants(InvariantLevel::Executable)
432 .map_err(|e| {
433 e.context("Invalid (non-executable) plan after LP Optimizers")
434 })?;
435
436 log_plan("Final optimized plan", &new_plan);
437 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
438 Ok(new_plan)
439 }
440}
441
442fn assert_valid_optimization(
447 plan: &LogicalPlan,
448 prev_schema: &Arc<DFSchema>,
449) -> Result<()> {
450 assert_expected_schema(prev_schema, plan)?;
453
454 Ok(())
455}
456
457#[cfg(test)]
458mod tests {
459 use std::sync::{Arc, Mutex};
460
461 use datafusion_common::tree_node::Transformed;
462 use datafusion_common::{
463 DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
464 };
465 use datafusion_expr::logical_plan::EmptyRelation;
466 use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit};
467
468 use crate::optimizer::Optimizer;
469 use crate::test::test_table_scan;
470 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
471
472 use super::ApplyOrder;
473
474 #[test]
475 fn skip_failing_rule() {
476 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
477 let config = OptimizerContext::new().with_skip_failing_rules(true);
478 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
479 produce_one_row: false,
480 schema: Arc::new(DFSchema::empty()),
481 });
482 opt.optimize(plan, &config, &observe).unwrap();
483 }
484
485 #[test]
486 fn no_skip_failing_rule() {
487 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
488 let config = OptimizerContext::new().with_skip_failing_rules(false);
489 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
490 produce_one_row: false,
491 schema: Arc::new(DFSchema::empty()),
492 });
493 let err = opt.optimize(plan, &config, &observe).unwrap_err();
494 assert_eq!(
495 "Optimizer rule 'bad rule' failed\ncaused by\n\
496 Error during planning: rule failed",
497 err.strip_backtrace()
498 );
499 }
500
501 #[test]
502 fn generate_different_schema() {
503 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
504 let config = OptimizerContext::new().with_skip_failing_rules(false);
505 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
506 produce_one_row: false,
507 schema: Arc::new(DFSchema::empty()),
508 });
509 let err = opt.optimize(plan, &config, &observe).unwrap_err();
510
511 assert_contains!(
513 err.strip_backtrace(),
514 "Failed due to a difference in schemas: original schema: DFSchema"
515 );
516 }
517
518 #[test]
519 fn skip_generate_different_schema() {
520 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
521 let config = OptimizerContext::new().with_skip_failing_rules(true);
522 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
523 produce_one_row: false,
524 schema: Arc::new(DFSchema::empty()),
525 });
526 opt.optimize(plan, &config, &observe).unwrap();
527 }
528
529 #[test]
530 fn generate_same_schema_different_metadata() -> Result<()> {
531 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
534 let config = OptimizerContext::new().with_skip_failing_rules(false);
535
536 let input = Arc::new(test_table_scan()?);
537 let input_schema = Arc::clone(input.schema());
538
539 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
540 vec![col("a"), col("b"), col("c")],
541 input,
542 add_metadata_to_fields(input_schema.as_ref()),
543 )?);
544
545 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
547 let optimized_plan = opt.optimize(plan, &config, &observe)?;
548 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
550 Ok(())
551 }
552
553 #[test]
554 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
555 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
559 let config = OptimizerContext::new().with_max_passes(16);
560
561 let initial_plan = LogicalPlanBuilder::empty(false)
562 .project([lit(1), lit(2), lit(3)])?
563 .project([lit(100)])? .build()?;
565
566 let mut plans: Vec<LogicalPlan> = Vec::new();
567 let final_plan =
568 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
569
570 assert_eq!(3, plans.len());
572
573 assert_eq!(initial_plan, final_plan);
575
576 Ok(())
577 }
578
579 #[test]
580 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
581 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
585 let config = OptimizerContext::new().with_max_passes(16);
586
587 let initial_plan = LogicalPlanBuilder::empty(false)
588 .project([lit(1), lit(2), lit(3)])?
589 .project([lit(100)])? .build()?;
591
592 let mut plans: Vec<LogicalPlan> = Vec::new();
593 let final_plan =
594 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
595
596 assert_eq!(4, plans.len());
598
599 assert_eq!(plans[0], final_plan);
601
602 Ok(())
603 }
604
605 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
606 let new_fields = schema
607 .iter()
608 .enumerate()
609 .map(|(i, (qualifier, field))| {
610 let metadata =
611 [("key".into(), format!("value {i}"))].into_iter().collect();
612
613 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
614 (qualifier.cloned(), Arc::new(new_arrow_field))
615 })
616 .collect::<Vec<_>>();
617
618 let new_metadata = schema.metadata().clone();
619 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
620 }
621
622 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
623
624 #[derive(Default, Debug)]
625 struct BadRule {}
626
627 impl OptimizerRule for BadRule {
628 fn name(&self) -> &str {
629 "bad rule"
630 }
631
632 fn supports_rewrite(&self) -> bool {
633 true
634 }
635
636 fn rewrite(
637 &self,
638 _plan: LogicalPlan,
639 _config: &dyn OptimizerConfig,
640 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
641 plan_err!("rule failed")
642 }
643 }
644
645 #[derive(Default, Debug)]
647 struct GetTableScanRule {}
648
649 impl OptimizerRule for GetTableScanRule {
650 fn name(&self) -> &str {
651 "get table_scan rule"
652 }
653
654 fn supports_rewrite(&self) -> bool {
655 true
656 }
657
658 fn rewrite(
659 &self,
660 _plan: LogicalPlan,
661 _config: &dyn OptimizerConfig,
662 ) -> Result<Transformed<LogicalPlan>> {
663 let table_scan = test_table_scan()?;
664 Ok(Transformed::yes(
665 LogicalPlanBuilder::from(table_scan).build()?,
666 ))
667 }
668 }
669
670 #[derive(Default, Debug)]
674 struct RotateProjectionRule {
675 reverse_on_first_pass: Mutex<bool>,
677 }
678
679 impl RotateProjectionRule {
680 fn new(reverse_on_first_pass: bool) -> Self {
681 Self {
682 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
683 }
684 }
685 }
686
687 impl OptimizerRule for RotateProjectionRule {
688 fn name(&self) -> &str {
689 "rotate_projection"
690 }
691
692 fn apply_order(&self) -> Option<ApplyOrder> {
693 Some(ApplyOrder::TopDown)
694 }
695
696 fn supports_rewrite(&self) -> bool {
697 true
698 }
699
700 fn rewrite(
701 &self,
702 plan: LogicalPlan,
703 _config: &dyn OptimizerConfig,
704 ) -> Result<Transformed<LogicalPlan>> {
705 let projection = match plan {
706 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
707 _ => return Ok(Transformed::no(plan)),
708 };
709
710 let mut exprs = projection.expr.clone();
711
712 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
713 if *reverse {
714 exprs.reverse();
715 *reverse = false;
716 } else {
717 exprs.rotate_left(1);
718 }
719
720 Ok(Transformed::yes(LogicalPlan::Projection(
721 Projection::try_new(exprs, Arc::clone(&projection.input))?,
722 )))
723 }
724 }
725}