Skip to main content

datafusion_optimizer/
optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Optimizer`] and [`OptimizerRule`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{InvariantLevel, assert_expected_schema};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_outer_join::EliminateOuterJoin;
45use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
46use crate::extract_leaf_expressions::{ExtractLeafExpressions, PushDownLeafProjections};
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::optimize_unions::OptimizeUnions;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::rewrite_set_comparison::RewriteSetComparison;
56use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
57use crate::simplify_expressions::SimplifyExpressions;
58use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
59use crate::utils::log_plan;
60
61/// Transforms one [`LogicalPlan`] into another which computes the same results,
62/// but in a potentially more efficient way.
63///
64/// See notes on [`Self::rewrite`] for details on how to implement an `OptimizerRule`.
65///
66/// To change the semantics of a `LogicalPlan`, see [`AnalyzerRule`].
67///
68/// Use [`SessionState::add_optimizer_rule`] to register additional
69/// `OptimizerRule`s.
70///
71/// [`AnalyzerRule`]: crate::analyzer::AnalyzerRule
72/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_optimizer_rule
73pub trait OptimizerRule: Debug {
74    /// A human readable name for this optimizer rule
75    fn name(&self) -> &str;
76
77    /// How should the rule be applied by the optimizer? See comments on
78    /// [`ApplyOrder`] for details.
79    ///
80    /// If returns `None`, the default, the rule must handle recursion itself
81    fn apply_order(&self) -> Option<ApplyOrder> {
82        None
83    }
84
85    /// Does this rule support rewriting owned plans (rather than by reference)?
86    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
87    fn supports_rewrite(&self) -> bool {
88        true
89    }
90
91    /// Try to rewrite `plan` to an optimized form, returning [`Transformed::yes`]
92    /// if the plan was rewritten and [`Transformed::no`] if it was not.
93    ///
94    /// # Notes for implementations:
95    ///
96    /// ## Return the same plan if no changes were made
97    ///
98    /// If there are no suitable transformations for the input plan,
99    /// the optimizer should simply return it unmodified.
100    ///
101    /// The optimizer will call `rewrite` several times until a fixed point is
102    /// reached, so it is important that `rewrite` return [`Transformed::no`] if
103    /// the output is the same.
104    ///
105    /// ## Matching on functions
106    ///
107    /// The rule should avoid function-specific transformations, and instead use
108    /// methods on [`ScalarUDFImpl`] and [`AggregateUDFImpl`]. Specifically, the
109    /// rule should not check function names as functions can be overridden, and
110    /// may not have the same semantics as the functions provided with
111    /// DataFusion.
112    ///
113    /// For example, if a rule rewrites a function based on the check
114    /// `func.name() == "sum"`, it may rewrite the plan incorrectly if the
115    /// registered `sum` function has different semantics (for example, the
116    /// `sum` function from the `datafusion-spark` crate).
117    ///
118    /// There are still several cases that rely on function name checking in
119    /// the rules included with DataFusion. Please see [#18643] for more details
120    /// and to help remove these cases.
121    ///
122    /// [`ScalarUDFImpl`]: datafusion_expr::ScalarUDFImpl
123    /// [`AggregateUDFImpl`]: datafusion_expr::ScalarUDFImpl
124    /// [#18643]: https://github.com/apache/datafusion/issues/18643
125    fn rewrite(
126        &self,
127        _plan: LogicalPlan,
128        _config: &dyn OptimizerConfig,
129    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
130        internal_err!("rewrite is not implemented for {}", self.name())
131    }
132}
133
134/// Options to control the DataFusion Optimizer.
135pub trait OptimizerConfig {
136    /// Return the time at which the query execution started. This
137    /// time is used as the value for `now()`. If `None`, time-dependent
138    /// functions like `now()` will not be simplified during optimization.
139    fn query_execution_start_time(&self) -> Option<DateTime<Utc>>;
140
141    /// Return alias generator used to generate unique aliases for subqueries
142    fn alias_generator(&self) -> &Arc<AliasGenerator>;
143
144    fn options(&self) -> Arc<ConfigOptions>;
145
146    fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
147        None
148    }
149}
150
151/// A standalone [`OptimizerConfig`] that can be used independently
152/// of DataFusion's config management
153#[derive(Debug)]
154pub struct OptimizerContext {
155    /// Query execution start time that can be used to rewrite
156    /// expressions such as `now()` to use a literal value instead.
157    /// If `None`, time-dependent functions will not be simplified.
158    query_execution_start_time: Option<DateTime<Utc>>,
159
160    /// Alias generator used to generate unique aliases for subqueries
161    alias_generator: Arc<AliasGenerator>,
162
163    options: Arc<ConfigOptions>,
164}
165
166impl OptimizerContext {
167    /// Create optimizer config
168    pub fn new() -> Self {
169        let mut options = ConfigOptions::default();
170        options.optimizer.filter_null_join_keys = true;
171
172        Self::new_with_config_options(Arc::new(options))
173    }
174
175    /// Create a optimizer config with provided [ConfigOptions].
176    pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
177        Self {
178            query_execution_start_time: Some(Utc::now()),
179            alias_generator: Arc::new(AliasGenerator::new()),
180            options,
181        }
182    }
183
184    /// Specify whether to enable the filter_null_keys rule
185    pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
186        Arc::make_mut(&mut self.options)
187            .optimizer
188            .filter_null_join_keys = filter_null_keys;
189        self
190    }
191
192    /// Set the query execution start time
193    pub fn with_query_execution_start_time(
194        mut self,
195        query_execution_start_time: DateTime<Utc>,
196    ) -> Self {
197        self.query_execution_start_time = Some(query_execution_start_time);
198        self
199    }
200
201    /// Clear the query execution start time. When `None`, time-dependent
202    /// functions like `now()` will not be simplified during optimization.
203    pub fn without_query_execution_start_time(mut self) -> Self {
204        self.query_execution_start_time = None;
205        self
206    }
207
208    /// Specify whether the optimizer should skip rules that produce
209    /// errors, or fail the query
210    pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
211        Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
212        self
213    }
214
215    /// Specify how many times to attempt to optimize the plan
216    pub fn with_max_passes(mut self, v: u8) -> Self {
217        Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
218        self
219    }
220}
221
222impl Default for OptimizerContext {
223    /// Create optimizer config
224    fn default() -> Self {
225        Self::new()
226    }
227}
228
229impl OptimizerConfig for OptimizerContext {
230    fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
231        self.query_execution_start_time
232    }
233
234    fn alias_generator(&self) -> &Arc<AliasGenerator> {
235        &self.alias_generator
236    }
237
238    fn options(&self) -> Arc<ConfigOptions> {
239        Arc::clone(&self.options)
240    }
241}
242
243/// A rule-based optimizer.
244#[derive(Clone, Debug)]
245pub struct Optimizer {
246    /// All optimizer rules to apply
247    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
248}
249
250/// Specifies how recursion for an `OptimizerRule` should be handled.
251///
252/// * `Some(apply_order)`: The Optimizer will recursively apply the rule to the plan.
253/// * `None`: the rule must handle any required recursion itself.
254#[derive(Debug, Clone, Copy, PartialEq)]
255pub enum ApplyOrder {
256    /// Apply the rule to the node before its inputs
257    TopDown,
258    /// Apply the rule to the node after its inputs
259    BottomUp,
260}
261
262impl Default for Optimizer {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268impl Optimizer {
269    /// Create a new optimizer using the recommended list of rules
270    pub fn new() -> Self {
271        // NOTEs:
272        // - The order of rules in this list is important, as it determines the
273        //   order in which they are applied.
274        // - Adding a new rule here is expensive as it will be applied to all
275        //   queries, and will likely increase the optimization time. Please extend
276        //   existing rules when possible, rather than adding a new rule.
277        //   If you do add a new rule considering having aggressive no-op paths
278        //   (e.g. if the plan doesn't contain any of the nodes you are looking for
279        //    return `Transformed::no`; only works if you control the traversal).
280        let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
281            Arc::new(RewriteSetComparison::new()),
282            Arc::new(OptimizeUnions::new()),
283            Arc::new(SimplifyExpressions::new()),
284            Arc::new(ReplaceDistinctWithAggregate::new()),
285            Arc::new(EliminateJoin::new()),
286            Arc::new(DecorrelatePredicateSubquery::new()),
287            Arc::new(ScalarSubqueryToJoin::new()),
288            Arc::new(DecorrelateLateralJoin::new()),
289            Arc::new(ExtractEquijoinPredicate::new()),
290            Arc::new(EliminateDuplicatedExpr::new()),
291            Arc::new(EliminateFilter::new()),
292            Arc::new(EliminateCrossJoin::new()),
293            Arc::new(EliminateLimit::new()),
294            Arc::new(PropagateEmptyRelation::new()),
295            Arc::new(FilterNullJoinKeys::default()),
296            Arc::new(EliminateOuterJoin::new()),
297            // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
298            Arc::new(PushDownLimit::new()),
299            Arc::new(PushDownFilter::new()),
300            Arc::new(SingleDistinctToGroupBy::new()),
301            // The previous optimizations added expressions and projections,
302            // that might benefit from the following rules
303            Arc::new(EliminateGroupByConstant::new()),
304            Arc::new(CommonSubexprEliminate::new()),
305            Arc::new(ExtractLeafExpressions::new()),
306            Arc::new(PushDownLeafProjections::new()),
307            Arc::new(OptimizeProjections::new()),
308        ];
309
310        Self::with_rules(rules)
311    }
312
313    /// Create a new optimizer with the given rules
314    pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
315        Self { rules }
316    }
317}
318
319/// Recursively rewrites LogicalPlans
320struct Rewriter<'a> {
321    apply_order: ApplyOrder,
322    rule: &'a dyn OptimizerRule,
323    config: &'a dyn OptimizerConfig,
324}
325
326impl<'a> Rewriter<'a> {
327    fn new(
328        apply_order: ApplyOrder,
329        rule: &'a dyn OptimizerRule,
330        config: &'a dyn OptimizerConfig,
331    ) -> Self {
332        Self {
333            apply_order,
334            rule,
335            config,
336        }
337    }
338}
339
340impl TreeNodeRewriter for Rewriter<'_> {
341    type Node = LogicalPlan;
342
343    fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
344        if self.apply_order == ApplyOrder::TopDown {
345            self.rule.rewrite(node, self.config)
346        } else {
347            Ok(Transformed::no(node))
348        }
349    }
350
351    fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
352        if self.apply_order == ApplyOrder::BottomUp {
353            self.rule.rewrite(node, self.config)
354        } else {
355            Ok(Transformed::no(node))
356        }
357    }
358}
359
360impl Optimizer {
361    /// Optimizes the logical plan by applying optimizer rules, and
362    /// invoking observer function after each call
363    pub fn optimize<F>(
364        &self,
365        plan: LogicalPlan,
366        config: &dyn OptimizerConfig,
367        mut observer: F,
368    ) -> Result<LogicalPlan>
369    where
370        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
371    {
372        // verify LP is valid, before the first LP optimizer pass.
373        plan.check_invariants(InvariantLevel::Executable)
374            .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
375
376        let start_time = Instant::now();
377        let options = config.options();
378        let mut new_plan = plan;
379
380        let mut previous_plans = HashSet::with_capacity(16);
381        previous_plans.insert(LogicalPlanSignature::new(&new_plan));
382
383        let starting_schema = Arc::clone(new_plan.schema());
384
385        let mut i = 0;
386        while i < options.optimizer.max_passes {
387            log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
388
389            for rule in &self.rules {
390                // If skipping failed rules, copy plan before attempting to rewrite
391                // as rewriting is destructive
392                let prev_plan = options
393                    .optimizer
394                    .skip_failed_rules
395                    .then(|| new_plan.clone());
396
397                let starting_schema = Arc::clone(new_plan.schema());
398
399                let result = match rule.apply_order() {
400                    // optimizer handles recursion
401                    Some(apply_order) => new_plan.rewrite_with_subqueries(
402                        &mut Rewriter::new(apply_order, rule.as_ref(), config),
403                    ),
404                    // rule handles recursion itself
405                    None => {
406                        rule.rewrite(new_plan, config)
407                    },
408                }
409                .and_then(|tnr| {
410                    // run checks optimizer invariant checks, per optimizer rule applied
411                    assert_valid_optimization(&tnr.data, &starting_schema)
412                        .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
413
414                    // run LP invariant checks only in debug mode for performance reasons
415                    #[cfg(debug_assertions)]
416                    tnr.data.check_invariants(InvariantLevel::Executable)
417                        .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
418
419                    Ok(tnr)
420                });
421
422                // Handle results
423                match (result, prev_plan) {
424                    // OptimizerRule was successful
425                    (
426                        Ok(Transformed {
427                            data, transformed, ..
428                        }),
429                        _,
430                    ) => {
431                        new_plan = data;
432                        observer(&new_plan, rule.as_ref());
433                        if transformed {
434                            log_plan(rule.name(), &new_plan);
435                        } else {
436                            debug!(
437                                "Plan unchanged by optimizer rule '{}' (pass {})",
438                                rule.name(),
439                                i
440                            );
441                        }
442                    }
443                    // OptimizerRule was unsuccessful, but skipped failed rules is on
444                    // so use the previous plan
445                    (Err(e), Some(orig_plan)) => {
446                        // Note to future readers: if you see this warning it signals a
447                        // bug in the DataFusion optimizer. Please consider filing a ticket
448                        // https://github.com/apache/datafusion
449                        warn!(
450                            "Skipping optimizer rule '{}' due to unexpected error: {}",
451                            rule.name(),
452                            e
453                        );
454                        new_plan = orig_plan;
455                    }
456                    // OptimizerRule was unsuccessful, but skipped failed rules is off, return error
457                    (Err(e), None) => {
458                        return Err(e.context(format!(
459                            "Optimizer rule '{}' failed",
460                            rule.name()
461                        )));
462                    }
463                }
464            }
465            log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
466
467            // HashSet::insert returns, whether the value was newly inserted.
468            let plan_is_fresh =
469                previous_plans.insert(LogicalPlanSignature::new(&new_plan));
470            if !plan_is_fresh {
471                // plan did not change, so no need to continue trying to optimize
472                debug!("optimizer pass {i} did not make changes");
473                break;
474            }
475            i += 1;
476        }
477
478        // verify that the optimizer passes only mutated what was permitted.
479        assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
480            e.context("Check optimizer-specific invariants after all passes")
481        })?;
482
483        // verify LP is valid, after the last optimizer pass.
484        new_plan
485            .check_invariants(InvariantLevel::Executable)
486            .map_err(|e| {
487                e.context("Invalid (non-executable) plan after LP Optimizers")
488            })?;
489
490        log_plan("Final optimized plan", &new_plan);
491        debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
492        Ok(new_plan)
493    }
494}
495
496/// These are invariants which should hold true before and after [`LogicalPlan`] optimization.
497///
498/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular
499/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes.
500fn assert_valid_optimization(
501    plan: &LogicalPlan,
502    prev_schema: &Arc<DFSchema>,
503) -> Result<()> {
504    // verify invariant: optimizer passes should not change the schema if the schema can't be cast from the previous schema.
505    // Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
506    assert_expected_schema(prev_schema, plan)?;
507
508    Ok(())
509}
510
511#[cfg(test)]
512mod tests {
513    use std::sync::{Arc, Mutex};
514
515    use datafusion_common::tree_node::Transformed;
516    use datafusion_common::{
517        DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
518    };
519    use datafusion_expr::logical_plan::EmptyRelation;
520    use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit};
521
522    use crate::optimizer::Optimizer;
523    use crate::test::test_table_scan;
524    use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
525
526    use super::ApplyOrder;
527
528    #[test]
529    fn skip_failing_rule() {
530        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
531        let config = OptimizerContext::new().with_skip_failing_rules(true);
532        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
533            produce_one_row: false,
534            schema: Arc::new(DFSchema::empty()),
535        });
536        opt.optimize(plan, &config, &observe).unwrap();
537    }
538
539    #[test]
540    fn no_skip_failing_rule() {
541        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
542        let config = OptimizerContext::new().with_skip_failing_rules(false);
543        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
544            produce_one_row: false,
545            schema: Arc::new(DFSchema::empty()),
546        });
547        let err = opt.optimize(plan, &config, &observe).unwrap_err();
548        assert_eq!(
549            "Optimizer rule 'bad rule' failed\ncaused by\n\
550            Error during planning: rule failed",
551            err.strip_backtrace()
552        );
553    }
554
555    #[test]
556    fn generate_different_schema() {
557        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
558        let config = OptimizerContext::new().with_skip_failing_rules(false);
559        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
560            produce_one_row: false,
561            schema: Arc::new(DFSchema::empty()),
562        });
563        let err = opt.optimize(plan, &config, &observe).unwrap_err();
564
565        // Simplify assert to check the error message contains the expected message
566        assert_contains!(
567            err.strip_backtrace(),
568            "Failed due to a difference in schemas: original schema: DFSchema"
569        );
570    }
571
572    #[test]
573    fn skip_generate_different_schema() {
574        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
575        let config = OptimizerContext::new().with_skip_failing_rules(true);
576        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
577            produce_one_row: false,
578            schema: Arc::new(DFSchema::empty()),
579        });
580        opt.optimize(plan, &config, &observe).unwrap();
581    }
582
583    #[test]
584    fn generate_same_schema_different_metadata() -> Result<()> {
585        // if the plan creates more metadata than previously (because
586        // some wrapping functions are removed, etc) do not error
587        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
588        let config = OptimizerContext::new().with_skip_failing_rules(false);
589
590        let input = Arc::new(test_table_scan()?);
591        let input_schema = Arc::clone(input.schema());
592
593        let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
594            vec![col("a"), col("b"), col("c")],
595            input,
596            add_metadata_to_fields(input_schema.as_ref()),
597        )?);
598
599        // optimizing should be ok, but the schema will have changed  (no metadata)
600        assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
601        let optimized_plan = opt.optimize(plan, &config, &observe)?;
602        // metadata was removed
603        assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
604        Ok(())
605    }
606
607    #[test]
608    fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
609        // Run a goofy optimizer, which rotates projection columns
610        // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
611
612        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
613        let config = OptimizerContext::new().with_max_passes(16);
614
615        let initial_plan = LogicalPlanBuilder::empty(false)
616            .project([lit(1), lit(2), lit(3)])?
617            .project([lit(100)])? // to not trigger changed schema error
618            .build()?;
619
620        let mut plans: Vec<LogicalPlan> = Vec::new();
621        let final_plan =
622            opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
623
624        // initial_plan is not observed, so we have 3 plans
625        assert_eq!(3, plans.len());
626
627        // we got again the initial_plan with [1, 2, 3]
628        assert_eq!(initial_plan, final_plan);
629
630        Ok(())
631    }
632
633    #[test]
634    fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
635        // Run a goofy optimizer, which reverses and rotates projection columns
636        // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
637
638        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
639        let config = OptimizerContext::new().with_max_passes(16);
640
641        let initial_plan = LogicalPlanBuilder::empty(false)
642            .project([lit(1), lit(2), lit(3)])?
643            .project([lit(100)])? // to not trigger changed schema error
644            .build()?;
645
646        let mut plans: Vec<LogicalPlan> = Vec::new();
647        let final_plan =
648            opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
649
650        // initial_plan is not observed, so we have 4 plans
651        assert_eq!(4, plans.len());
652
653        // we got again the plan with [3, 2, 1]
654        assert_eq!(plans[0], final_plan);
655
656        Ok(())
657    }
658
659    fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
660        let new_fields = schema
661            .iter()
662            .enumerate()
663            .map(|(i, (qualifier, field))| {
664                let metadata =
665                    [("key".into(), format!("value {i}"))].into_iter().collect();
666
667                let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
668                (qualifier.cloned(), Arc::new(new_arrow_field))
669            })
670            .collect::<Vec<_>>();
671
672        let new_metadata = schema.metadata().clone();
673        Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
674    }
675
676    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
677
678    #[derive(Default, Debug)]
679    struct BadRule {}
680
681    impl OptimizerRule for BadRule {
682        fn name(&self) -> &str {
683            "bad rule"
684        }
685
686        fn supports_rewrite(&self) -> bool {
687            true
688        }
689
690        fn rewrite(
691            &self,
692            _plan: LogicalPlan,
693            _config: &dyn OptimizerConfig,
694        ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
695            plan_err!("rule failed")
696        }
697    }
698
699    /// Replaces whatever plan with a single table scan
700    #[derive(Default, Debug)]
701    struct GetTableScanRule {}
702
703    impl OptimizerRule for GetTableScanRule {
704        fn name(&self) -> &str {
705            "get table_scan rule"
706        }
707
708        fn supports_rewrite(&self) -> bool {
709            true
710        }
711
712        fn rewrite(
713            &self,
714            _plan: LogicalPlan,
715            _config: &dyn OptimizerConfig,
716        ) -> Result<Transformed<LogicalPlan>> {
717            let table_scan = test_table_scan()?;
718            Ok(Transformed::yes(
719                LogicalPlanBuilder::from(table_scan).build()?,
720            ))
721        }
722    }
723
724    /// A goofy rule doing rotation of columns in all projections.
725    ///
726    /// Useful to test cycle detection.
727    #[derive(Default, Debug)]
728    struct RotateProjectionRule {
729        // reverse exprs instead of rotating on the first pass
730        reverse_on_first_pass: Mutex<bool>,
731    }
732
733    impl RotateProjectionRule {
734        fn new(reverse_on_first_pass: bool) -> Self {
735            Self {
736                reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
737            }
738        }
739    }
740
741    impl OptimizerRule for RotateProjectionRule {
742        fn name(&self) -> &str {
743            "rotate_projection"
744        }
745
746        fn apply_order(&self) -> Option<ApplyOrder> {
747            Some(ApplyOrder::TopDown)
748        }
749
750        fn supports_rewrite(&self) -> bool {
751            true
752        }
753
754        fn rewrite(
755            &self,
756            plan: LogicalPlan,
757            _config: &dyn OptimizerConfig,
758        ) -> Result<Transformed<LogicalPlan>> {
759            let projection = match plan {
760                LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
761                _ => return Ok(Transformed::no(plan)),
762            };
763
764            let mut exprs = projection.expr.clone();
765
766            let mut reverse = self.reverse_on_first_pass.lock().unwrap();
767            if *reverse {
768                exprs.reverse();
769                *reverse = false;
770            } else {
771                exprs.rotate_left(1);
772            }
773
774            Ok(Transformed::yes(LogicalPlan::Projection(
775                Projection::try_new(exprs, Arc::clone(&projection.input))?,
776            )))
777        }
778    }
779}