Skip to main content

datafusion_optimizer/
optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Optimizer`] and [`OptimizerRule`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{InvariantLevel, assert_expected_schema};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_outer_join::EliminateOuterJoin;
45use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
46use crate::extract_leaf_expressions::{ExtractLeafExpressions, PushDownLeafProjections};
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::optimize_unions::OptimizeUnions;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::rewrite_set_comparison::RewriteSetComparison;
56use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
57use crate::simplify_expressions::SimplifyExpressions;
58use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
59use crate::unions_to_filter::UnionsToFilter;
60use crate::utils::log_plan;
61
62/// Transforms one [`LogicalPlan`] into another which computes the same results,
63/// but in a potentially more efficient way.
64///
65/// See notes on [`Self::rewrite`] for details on how to implement an `OptimizerRule`.
66///
67/// To change the semantics of a `LogicalPlan`, see [`AnalyzerRule`].
68///
69/// Use [`SessionState::add_optimizer_rule`] to register additional
70/// `OptimizerRule`s.
71///
72/// [`AnalyzerRule`]: crate::analyzer::AnalyzerRule
73/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_optimizer_rule
74pub trait OptimizerRule: Debug {
75    /// A human readable name for this optimizer rule
76    fn name(&self) -> &str;
77
78    /// How should the rule be applied by the optimizer? See comments on
79    /// [`ApplyOrder`] for details.
80    ///
81    /// If returns `None`, the default, the rule must handle recursion itself
82    fn apply_order(&self) -> Option<ApplyOrder> {
83        None
84    }
85
86    /// Does this rule support rewriting owned plans (rather than by reference)?
87    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
88    fn supports_rewrite(&self) -> bool {
89        true
90    }
91
92    /// Try to rewrite `plan` to an optimized form, returning [`Transformed::yes`]
93    /// if the plan was rewritten and [`Transformed::no`] if it was not.
94    ///
95    /// # Notes for implementations:
96    ///
97    /// ## Return the same plan if no changes were made
98    ///
99    /// If there are no suitable transformations for the input plan,
100    /// the optimizer should simply return it unmodified.
101    ///
102    /// The optimizer will call `rewrite` several times until a fixed point is
103    /// reached, so it is important that `rewrite` return [`Transformed::no`] if
104    /// the output is the same.
105    ///
106    /// ## Matching on functions
107    ///
108    /// The rule should avoid function-specific transformations, and instead use
109    /// methods on [`ScalarUDFImpl`] and [`AggregateUDFImpl`]. Specifically, the
110    /// rule should not check function names as functions can be overridden, and
111    /// may not have the same semantics as the functions provided with
112    /// DataFusion.
113    ///
114    /// For example, if a rule rewrites a function based on the check
115    /// `func.name() == "sum"`, it may rewrite the plan incorrectly if the
116    /// registered `sum` function has different semantics (for example, the
117    /// `sum` function from the `datafusion-spark` crate).
118    ///
119    /// There are still several cases that rely on function name checking in
120    /// the rules included with DataFusion. Please see [#18643] for more details
121    /// and to help remove these cases.
122    ///
123    /// [`ScalarUDFImpl`]: datafusion_expr::ScalarUDFImpl
124    /// [`AggregateUDFImpl`]: datafusion_expr::ScalarUDFImpl
125    /// [#18643]: https://github.com/apache/datafusion/issues/18643
126    fn rewrite(
127        &self,
128        _plan: LogicalPlan,
129        _config: &dyn OptimizerConfig,
130    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
131        internal_err!("rewrite is not implemented for {}", self.name())
132    }
133}
134
135/// Options to control the DataFusion Optimizer.
136pub trait OptimizerConfig {
137    /// Return the time at which the query execution started. This
138    /// time is used as the value for `now()`. If `None`, time-dependent
139    /// functions like `now()` will not be simplified during optimization.
140    fn query_execution_start_time(&self) -> Option<DateTime<Utc>>;
141
142    /// Return alias generator used to generate unique aliases for subqueries
143    fn alias_generator(&self) -> &Arc<AliasGenerator>;
144
145    fn options(&self) -> Arc<ConfigOptions>;
146
147    fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
148        None
149    }
150}
151
152/// A standalone [`OptimizerConfig`] that can be used independently
153/// of DataFusion's config management
154#[derive(Debug)]
155pub struct OptimizerContext {
156    /// Query execution start time that can be used to rewrite
157    /// expressions such as `now()` to use a literal value instead.
158    /// If `None`, time-dependent functions will not be simplified.
159    query_execution_start_time: Option<DateTime<Utc>>,
160
161    /// Alias generator used to generate unique aliases for subqueries
162    alias_generator: Arc<AliasGenerator>,
163
164    options: Arc<ConfigOptions>,
165}
166
167impl OptimizerContext {
168    /// Create optimizer config
169    pub fn new() -> Self {
170        let mut options = ConfigOptions::default();
171        options.optimizer.filter_null_join_keys = true;
172
173        Self::new_with_config_options(Arc::new(options))
174    }
175
176    /// Create a optimizer config with provided [ConfigOptions].
177    pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
178        Self {
179            query_execution_start_time: Some(Utc::now()),
180            alias_generator: Arc::new(AliasGenerator::new()),
181            options,
182        }
183    }
184
185    /// Specify whether to enable the filter_null_keys rule
186    pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
187        Arc::make_mut(&mut self.options)
188            .optimizer
189            .filter_null_join_keys = filter_null_keys;
190        self
191    }
192
193    /// Set the query execution start time
194    pub fn with_query_execution_start_time(
195        mut self,
196        query_execution_start_time: DateTime<Utc>,
197    ) -> Self {
198        self.query_execution_start_time = Some(query_execution_start_time);
199        self
200    }
201
202    /// Clear the query execution start time. When `None`, time-dependent
203    /// functions like `now()` will not be simplified during optimization.
204    pub fn without_query_execution_start_time(mut self) -> Self {
205        self.query_execution_start_time = None;
206        self
207    }
208
209    /// Specify whether the optimizer should skip rules that produce
210    /// errors, or fail the query
211    pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
212        Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
213        self
214    }
215
216    /// Specify how many times to attempt to optimize the plan
217    pub fn with_max_passes(mut self, v: u8) -> Self {
218        Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
219        self
220    }
221}
222
223impl Default for OptimizerContext {
224    /// Create optimizer config
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230impl OptimizerConfig for OptimizerContext {
231    fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
232        self.query_execution_start_time
233    }
234
235    fn alias_generator(&self) -> &Arc<AliasGenerator> {
236        &self.alias_generator
237    }
238
239    fn options(&self) -> Arc<ConfigOptions> {
240        Arc::clone(&self.options)
241    }
242}
243
244/// A rule-based optimizer.
245#[derive(Clone, Debug)]
246pub struct Optimizer {
247    /// All optimizer rules to apply
248    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
249}
250
251/// Specifies how recursion for an `OptimizerRule` should be handled.
252///
253/// * `Some(apply_order)`: The Optimizer will recursively apply the rule to the plan.
254/// * `None`: the rule must handle any required recursion itself.
255#[derive(Debug, Clone, Copy, PartialEq)]
256pub enum ApplyOrder {
257    /// Apply the rule to the node before its inputs
258    TopDown,
259    /// Apply the rule to the node after its inputs
260    BottomUp,
261}
262
263impl Default for Optimizer {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269impl Optimizer {
270    /// Create a new optimizer using the recommended list of rules
271    pub fn new() -> Self {
272        // NOTEs:
273        // - The order of rules in this list is important, as it determines the
274        //   order in which they are applied.
275        // - Adding a new rule here is expensive as it will be applied to all
276        //   queries, and will likely increase the optimization time. Please extend
277        //   existing rules when possible, rather than adding a new rule.
278        //   If you do add a new rule considering having aggressive no-op paths
279        //   (e.g. if the plan doesn't contain any of the nodes you are looking for
280        //    return `Transformed::no`; only works if you control the traversal).
281        let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
282            Arc::new(RewriteSetComparison::new()),
283            Arc::new(OptimizeUnions::new()),
284            Arc::new(UnionsToFilter::new()),
285            Arc::new(SimplifyExpressions::new()),
286            Arc::new(ReplaceDistinctWithAggregate::new()),
287            Arc::new(EliminateJoin::new()),
288            Arc::new(DecorrelatePredicateSubquery::new()),
289            Arc::new(ScalarSubqueryToJoin::new()),
290            Arc::new(DecorrelateLateralJoin::new()),
291            Arc::new(ExtractEquijoinPredicate::new()),
292            Arc::new(EliminateDuplicatedExpr::new()),
293            Arc::new(EliminateFilter::new()),
294            Arc::new(EliminateCrossJoin::new()),
295            Arc::new(EliminateLimit::new()),
296            Arc::new(PropagateEmptyRelation::new()),
297            Arc::new(FilterNullJoinKeys::default()),
298            Arc::new(EliminateOuterJoin::new()),
299            // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
300            Arc::new(PushDownLimit::new()),
301            Arc::new(PushDownFilter::new()),
302            Arc::new(SingleDistinctToGroupBy::new()),
303            // The previous optimizations added expressions and projections,
304            // that might benefit from the following rules
305            Arc::new(EliminateGroupByConstant::new()),
306            Arc::new(CommonSubexprEliminate::new()),
307            Arc::new(ExtractLeafExpressions::new()),
308            Arc::new(PushDownLeafProjections::new()),
309            Arc::new(OptimizeProjections::new()),
310        ];
311
312        Self::with_rules(rules)
313    }
314
315    /// Create a new optimizer with the given rules
316    pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
317        Self { rules }
318    }
319}
320
321/// Recursively rewrites LogicalPlans
322struct Rewriter<'a> {
323    apply_order: ApplyOrder,
324    rule: &'a dyn OptimizerRule,
325    config: &'a dyn OptimizerConfig,
326}
327
328impl<'a> Rewriter<'a> {
329    fn new(
330        apply_order: ApplyOrder,
331        rule: &'a dyn OptimizerRule,
332        config: &'a dyn OptimizerConfig,
333    ) -> Self {
334        Self {
335            apply_order,
336            rule,
337            config,
338        }
339    }
340}
341
342impl TreeNodeRewriter for Rewriter<'_> {
343    type Node = LogicalPlan;
344
345    fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
346        if self.apply_order == ApplyOrder::TopDown {
347            self.rule.rewrite(node, self.config)
348        } else {
349            Ok(Transformed::no(node))
350        }
351    }
352
353    fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
354        if self.apply_order == ApplyOrder::BottomUp {
355            self.rule.rewrite(node, self.config)
356        } else {
357            Ok(Transformed::no(node))
358        }
359    }
360}
361
362impl Optimizer {
363    /// Optimizes the logical plan by applying optimizer rules, and
364    /// invoking observer function after each call
365    pub fn optimize<F>(
366        &self,
367        plan: LogicalPlan,
368        config: &dyn OptimizerConfig,
369        mut observer: F,
370    ) -> Result<LogicalPlan>
371    where
372        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
373    {
374        // verify LP is valid, before the first LP optimizer pass.
375        plan.check_invariants(InvariantLevel::Executable)
376            .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
377
378        let start_time = Instant::now();
379        let options = config.options();
380        let mut new_plan = plan;
381
382        let mut previous_plans = HashSet::with_capacity(16);
383        previous_plans.insert(LogicalPlanSignature::new(&new_plan));
384
385        let starting_schema = Arc::clone(new_plan.schema());
386
387        let mut i = 0;
388        while i < options.optimizer.max_passes {
389            log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
390
391            for rule in &self.rules {
392                // If skipping failed rules, copy plan before attempting to rewrite
393                // as rewriting is destructive
394                let prev_plan = options
395                    .optimizer
396                    .skip_failed_rules
397                    .then(|| new_plan.clone());
398
399                let starting_schema = Arc::clone(new_plan.schema());
400
401                let result = match rule.apply_order() {
402                    // optimizer handles recursion
403                    Some(apply_order) => new_plan.rewrite_with_subqueries(
404                        &mut Rewriter::new(apply_order, rule.as_ref(), config),
405                    ),
406                    // rule handles recursion itself
407                    None => {
408                        rule.rewrite(new_plan, config)
409                    },
410                }
411                .and_then(|tnr| {
412                    // run checks optimizer invariant checks, per optimizer rule applied
413                    assert_valid_optimization(&tnr.data, &starting_schema)
414                        .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
415
416                    // run LP invariant checks only in debug mode for performance reasons
417                    #[cfg(debug_assertions)]
418                    tnr.data.check_invariants(InvariantLevel::Executable)
419                        .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
420
421                    Ok(tnr)
422                });
423
424                // Handle results
425                match (result, prev_plan) {
426                    // OptimizerRule was successful
427                    (
428                        Ok(Transformed {
429                            data, transformed, ..
430                        }),
431                        _,
432                    ) => {
433                        new_plan = data;
434                        observer(&new_plan, rule.as_ref());
435                        if transformed {
436                            log_plan(rule.name(), &new_plan);
437                        } else {
438                            debug!(
439                                "Plan unchanged by optimizer rule '{}' (pass {})",
440                                rule.name(),
441                                i
442                            );
443                        }
444                    }
445                    // OptimizerRule was unsuccessful, but skipped failed rules is on
446                    // so use the previous plan
447                    (Err(e), Some(orig_plan)) => {
448                        // Note to future readers: if you see this warning it signals a
449                        // bug in the DataFusion optimizer. Please consider filing a ticket
450                        // https://github.com/apache/datafusion
451                        warn!(
452                            "Skipping optimizer rule '{}' due to unexpected error: {}",
453                            rule.name(),
454                            e
455                        );
456                        new_plan = orig_plan;
457                    }
458                    // OptimizerRule was unsuccessful, but skipped failed rules is off, return error
459                    (Err(e), None) => {
460                        return Err(e.context(format!(
461                            "Optimizer rule '{}' failed",
462                            rule.name()
463                        )));
464                    }
465                }
466            }
467            log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
468
469            // HashSet::insert returns, whether the value was newly inserted.
470            let plan_is_fresh =
471                previous_plans.insert(LogicalPlanSignature::new(&new_plan));
472            if !plan_is_fresh {
473                // plan did not change, so no need to continue trying to optimize
474                debug!("optimizer pass {i} did not make changes");
475                break;
476            }
477            i += 1;
478        }
479
480        // verify that the optimizer passes only mutated what was permitted.
481        assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
482            e.context("Check optimizer-specific invariants after all passes")
483        })?;
484
485        // verify LP is valid, after the last optimizer pass.
486        new_plan
487            .check_invariants(InvariantLevel::Executable)
488            .map_err(|e| {
489                e.context("Invalid (non-executable) plan after LP Optimizers")
490            })?;
491
492        log_plan("Final optimized plan", &new_plan);
493        debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
494        Ok(new_plan)
495    }
496}
497
498/// These are invariants which should hold true before and after [`LogicalPlan`] optimization.
499///
500/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular
501/// LogicalPlan is valid. Instead, this address if the optimization was valid based upon permitted changes.
502fn assert_valid_optimization(
503    plan: &LogicalPlan,
504    prev_schema: &Arc<DFSchema>,
505) -> Result<()> {
506    // verify invariant: optimizer passes should not change the schema if the schema can't be cast from the previous schema.
507    // Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
508    assert_expected_schema(prev_schema, plan)?;
509
510    Ok(())
511}
512
513#[cfg(test)]
514mod tests {
515    use std::sync::{Arc, Mutex};
516
517    use datafusion_common::tree_node::Transformed;
518    use datafusion_common::{
519        DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
520    };
521    use datafusion_expr::logical_plan::EmptyRelation;
522    use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit};
523
524    use crate::optimizer::Optimizer;
525    use crate::test::test_table_scan;
526    use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
527
528    use super::ApplyOrder;
529
530    #[test]
531    fn skip_failing_rule() {
532        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
533        let config = OptimizerContext::new().with_skip_failing_rules(true);
534        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
535            produce_one_row: false,
536            schema: Arc::new(DFSchema::empty()),
537        });
538        opt.optimize(plan, &config, &observe).unwrap();
539    }
540
541    #[test]
542    fn no_skip_failing_rule() {
543        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
544        let config = OptimizerContext::new().with_skip_failing_rules(false);
545        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
546            produce_one_row: false,
547            schema: Arc::new(DFSchema::empty()),
548        });
549        let err = opt.optimize(plan, &config, &observe).unwrap_err();
550        assert_eq!(
551            "Optimizer rule 'bad rule' failed\ncaused by\n\
552            Error during planning: rule failed",
553            err.strip_backtrace()
554        );
555    }
556
557    #[test]
558    fn generate_different_schema() {
559        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
560        let config = OptimizerContext::new().with_skip_failing_rules(false);
561        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
562            produce_one_row: false,
563            schema: Arc::new(DFSchema::empty()),
564        });
565        let err = opt.optimize(plan, &config, &observe).unwrap_err();
566
567        // Simplify assert to check the error message contains the expected message
568        assert_contains!(
569            err.strip_backtrace(),
570            "Failed due to a difference in schemas: original schema: DFSchema"
571        );
572    }
573
574    #[test]
575    fn skip_generate_different_schema() {
576        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
577        let config = OptimizerContext::new().with_skip_failing_rules(true);
578        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
579            produce_one_row: false,
580            schema: Arc::new(DFSchema::empty()),
581        });
582        opt.optimize(plan, &config, &observe).unwrap();
583    }
584
585    #[test]
586    fn generate_same_schema_different_metadata() -> Result<()> {
587        // if the plan creates more metadata than previously (because
588        // some wrapping functions are removed, etc) do not error
589        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
590        let config = OptimizerContext::new().with_skip_failing_rules(false);
591
592        let input = Arc::new(test_table_scan()?);
593        let input_schema = Arc::clone(input.schema());
594
595        let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
596            vec![col("a"), col("b"), col("c")],
597            input,
598            add_metadata_to_fields(input_schema.as_ref()),
599        )?);
600
601        // optimizing should be ok, but the schema will have changed  (no metadata)
602        assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
603        let optimized_plan = opt.optimize(plan, &config, &observe)?;
604        // metadata was removed
605        assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
606        Ok(())
607    }
608
609    #[test]
610    fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
611        // Run a goofy optimizer, which rotates projection columns
612        // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
613
614        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
615        let config = OptimizerContext::new().with_max_passes(16);
616
617        let initial_plan = LogicalPlanBuilder::empty(false)
618            .project([lit(1), lit(2), lit(3)])?
619            .project([lit(100)])? // to not trigger changed schema error
620            .build()?;
621
622        let mut plans: Vec<LogicalPlan> = Vec::new();
623        let final_plan =
624            opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
625
626        // initial_plan is not observed, so we have 3 plans
627        assert_eq!(3, plans.len());
628
629        // we got again the initial_plan with [1, 2, 3]
630        assert_eq!(initial_plan, final_plan);
631
632        Ok(())
633    }
634
635    #[test]
636    fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
637        // Run a goofy optimizer, which reverses and rotates projection columns
638        // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
639
640        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
641        let config = OptimizerContext::new().with_max_passes(16);
642
643        let initial_plan = LogicalPlanBuilder::empty(false)
644            .project([lit(1), lit(2), lit(3)])?
645            .project([lit(100)])? // to not trigger changed schema error
646            .build()?;
647
648        let mut plans: Vec<LogicalPlan> = Vec::new();
649        let final_plan =
650            opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
651
652        // initial_plan is not observed, so we have 4 plans
653        assert_eq!(4, plans.len());
654
655        // we got again the plan with [3, 2, 1]
656        assert_eq!(plans[0], final_plan);
657
658        Ok(())
659    }
660
661    fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
662        let new_fields = schema
663            .iter()
664            .enumerate()
665            .map(|(i, (qualifier, field))| {
666                let metadata =
667                    [("key".into(), format!("value {i}"))].into_iter().collect();
668
669                let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
670                (qualifier.cloned(), Arc::new(new_arrow_field))
671            })
672            .collect::<Vec<_>>();
673
674        let new_metadata = schema.metadata().clone();
675        Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
676    }
677
678    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
679
680    #[derive(Default, Debug)]
681    struct BadRule {}
682
683    impl OptimizerRule for BadRule {
684        fn name(&self) -> &str {
685            "bad rule"
686        }
687
688        fn supports_rewrite(&self) -> bool {
689            true
690        }
691
692        fn rewrite(
693            &self,
694            _plan: LogicalPlan,
695            _config: &dyn OptimizerConfig,
696        ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
697            plan_err!("rule failed")
698        }
699    }
700
701    /// Replaces whatever plan with a single table scan
702    #[derive(Default, Debug)]
703    struct GetTableScanRule {}
704
705    impl OptimizerRule for GetTableScanRule {
706        fn name(&self) -> &str {
707            "get table_scan rule"
708        }
709
710        fn supports_rewrite(&self) -> bool {
711            true
712        }
713
714        fn rewrite(
715            &self,
716            _plan: LogicalPlan,
717            _config: &dyn OptimizerConfig,
718        ) -> Result<Transformed<LogicalPlan>> {
719            let table_scan = test_table_scan()?;
720            Ok(Transformed::yes(
721                LogicalPlanBuilder::from(table_scan).build()?,
722            ))
723        }
724    }
725
726    /// A goofy rule doing rotation of columns in all projections.
727    ///
728    /// Useful to test cycle detection.
729    #[derive(Default, Debug)]
730    struct RotateProjectionRule {
731        // reverse exprs instead of rotating on the first pass
732        reverse_on_first_pass: Mutex<bool>,
733    }
734
735    impl RotateProjectionRule {
736        fn new(reverse_on_first_pass: bool) -> Self {
737            Self {
738                reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
739            }
740        }
741    }
742
743    impl OptimizerRule for RotateProjectionRule {
744        fn name(&self) -> &str {
745            "rotate_projection"
746        }
747
748        fn apply_order(&self) -> Option<ApplyOrder> {
749            Some(ApplyOrder::TopDown)
750        }
751
752        fn supports_rewrite(&self) -> bool {
753            true
754        }
755
756        fn rewrite(
757            &self,
758            plan: LogicalPlan,
759            _config: &dyn OptimizerConfig,
760        ) -> Result<Transformed<LogicalPlan>> {
761            let projection = match plan {
762                LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
763                _ => return Ok(Transformed::no(plan)),
764            };
765
766            let mut exprs = projection.expr.clone();
767
768            let mut reverse = self.reverse_on_first_pass.lock().unwrap();
769            if *reverse {
770                exprs.reverse();
771                *reverse = false;
772            } else {
773                exprs.rotate_left(1);
774            }
775
776            Ok(Transformed::yes(LogicalPlan::Projection(
777                Projection::try_new(exprs, Arc::clone(&projection.input))?,
778            )))
779        }
780    }
781}