datafusion_optimizer/
optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Optimizer`] and [`OptimizerRule`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
37use crate::eliminate_cross_join::EliminateCrossJoin;
38use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
39use crate::eliminate_filter::EliminateFilter;
40use crate::eliminate_group_by_constant::EliminateGroupByConstant;
41use crate::eliminate_join::EliminateJoin;
42use crate::eliminate_limit::EliminateLimit;
43use crate::eliminate_nested_union::EliminateNestedUnion;
44use crate::eliminate_one_union::EliminateOneUnion;
45use crate::eliminate_outer_join::EliminateOuterJoin;
46use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::plan_signature::LogicalPlanSignature;
50use crate::propagate_empty_relation::PropagateEmptyRelation;
51use crate::push_down_filter::PushDownFilter;
52use crate::push_down_limit::PushDownLimit;
53use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
54use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
55use crate::simplify_expressions::SimplifyExpressions;
56use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
57use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
58use crate::utils::log_plan;
59
60/// `OptimizerRule`s transforms one [`LogicalPlan`] into another which
61/// computes the same results, but in a potentially more efficient
62/// way. If there are no suitable transformations for the input plan,
63/// the optimizer should simply return it unmodified.
64///
65/// To change the semantics of a `LogicalPlan`, see [`AnalyzerRule`]
66///
67/// Use [`SessionState::add_optimizer_rule`] to register additional
68/// `OptimizerRule`s.
69///
70/// [`AnalyzerRule`]: crate::analyzer::AnalyzerRule
71/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_optimizer_rule
72pub trait OptimizerRule: Debug {
73    /// Try and rewrite `plan` to an optimized form, returning None if the plan
74    /// cannot be optimized by this rule.
75    ///
76    /// Note this API will be deprecated in the future as it requires `clone`ing
77    /// the input plan, which can be expensive. OptimizerRules should implement
78    /// [`Self::rewrite`] instead.
79    #[deprecated(
80        since = "40.0.0",
81        note = "please implement supports_rewrite and rewrite instead"
82    )]
83    fn try_optimize(
84        &self,
85        _plan: &LogicalPlan,
86        _config: &dyn OptimizerConfig,
87    ) -> Result<Option<LogicalPlan>> {
88        internal_err!("Should have called rewrite")
89    }
90
91    /// A human readable name for this optimizer rule
92    fn name(&self) -> &str;
93
94    /// How should the rule be applied by the optimizer? See comments on
95    /// [`ApplyOrder`] for details.
96    ///
97    /// If returns `None`, the default, the rule must handle recursion itself
98    fn apply_order(&self) -> Option<ApplyOrder> {
99        None
100    }
101
102    /// Does this rule support rewriting owned plans (rather than by reference)?
103    fn supports_rewrite(&self) -> bool {
104        true
105    }
106
107    /// Try to rewrite `plan` to an optimized form, returning `Transformed::yes`
108    /// if the plan was rewritten and `Transformed::no` if it was not.
109    ///
110    /// Note: this function is only called if [`Self::supports_rewrite`] returns
111    /// true. Otherwise the Optimizer calls  [`Self::try_optimize`]
112    fn rewrite(
113        &self,
114        _plan: LogicalPlan,
115        _config: &dyn OptimizerConfig,
116    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
117        internal_err!("rewrite is not implemented for {}", self.name())
118    }
119}
120
121/// Options to control the DataFusion Optimizer.
122pub trait OptimizerConfig {
123    /// Return the time at which the query execution started. This
124    /// time is used as the value for now()
125    fn query_execution_start_time(&self) -> DateTime<Utc>;
126
127    /// Return alias generator used to generate unique aliases for subqueries
128    fn alias_generator(&self) -> &Arc<AliasGenerator>;
129
130    fn options(&self) -> &ConfigOptions;
131
132    fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
133        None
134    }
135}
136
137/// A standalone [`OptimizerConfig`] that can be used independently
138/// of DataFusion's config management
139#[derive(Debug)]
140pub struct OptimizerContext {
141    /// Query execution start time that can be used to rewrite
142    /// expressions such as `now()` to use a literal value instead
143    query_execution_start_time: DateTime<Utc>,
144
145    /// Alias generator used to generate unique aliases for subqueries
146    alias_generator: Arc<AliasGenerator>,
147
148    options: ConfigOptions,
149}
150
151impl OptimizerContext {
152    /// Create optimizer config
153    pub fn new() -> Self {
154        let mut options = ConfigOptions::default();
155        options.optimizer.filter_null_join_keys = true;
156
157        Self {
158            query_execution_start_time: Utc::now(),
159            alias_generator: Arc::new(AliasGenerator::new()),
160            options,
161        }
162    }
163
164    /// Specify whether to enable the filter_null_keys rule
165    pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
166        self.options.optimizer.filter_null_join_keys = filter_null_keys;
167        self
168    }
169
170    /// Specify whether the optimizer should skip rules that produce
171    /// errors, or fail the query
172    pub fn with_query_execution_start_time(
173        mut self,
174        query_execution_tart_time: DateTime<Utc>,
175    ) -> Self {
176        self.query_execution_start_time = query_execution_tart_time;
177        self
178    }
179
180    /// Specify whether the optimizer should skip rules that produce
181    /// errors, or fail the query
182    pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
183        self.options.optimizer.skip_failed_rules = b;
184        self
185    }
186
187    /// Specify how many times to attempt to optimize the plan
188    pub fn with_max_passes(mut self, v: u8) -> Self {
189        self.options.optimizer.max_passes = v as usize;
190        self
191    }
192}
193
194impl Default for OptimizerContext {
195    /// Create optimizer config
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201impl OptimizerConfig for OptimizerContext {
202    fn query_execution_start_time(&self) -> DateTime<Utc> {
203        self.query_execution_start_time
204    }
205
206    fn alias_generator(&self) -> &Arc<AliasGenerator> {
207        &self.alias_generator
208    }
209
210    fn options(&self) -> &ConfigOptions {
211        &self.options
212    }
213}
214
215/// A rule-based optimizer.
216#[derive(Clone, Debug)]
217pub struct Optimizer {
218    /// All optimizer rules to apply
219    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
220}
221
222/// Specifies how recursion for an `OptimizerRule` should be handled.
223///
224/// * `Some(apply_order)`: The Optimizer will recursively apply the rule to the plan.
225/// * `None`: the rule must handle any required recursion itself.
226#[derive(Debug, Clone, Copy, PartialEq)]
227pub enum ApplyOrder {
228    /// Apply the rule to the node before its inputs
229    TopDown,
230    /// Apply the rule to the node after its inputs
231    BottomUp,
232}
233
234impl Default for Optimizer {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240impl Optimizer {
241    /// Create a new optimizer using the recommended list of rules
242    pub fn new() -> Self {
243        let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
244            Arc::new(EliminateNestedUnion::new()),
245            Arc::new(SimplifyExpressions::new()),
246            Arc::new(UnwrapCastInComparison::new()),
247            Arc::new(ReplaceDistinctWithAggregate::new()),
248            Arc::new(EliminateJoin::new()),
249            Arc::new(DecorrelatePredicateSubquery::new()),
250            Arc::new(ScalarSubqueryToJoin::new()),
251            Arc::new(ExtractEquijoinPredicate::new()),
252            Arc::new(EliminateDuplicatedExpr::new()),
253            Arc::new(EliminateFilter::new()),
254            Arc::new(EliminateCrossJoin::new()),
255            Arc::new(CommonSubexprEliminate::new()),
256            Arc::new(EliminateLimit::new()),
257            Arc::new(PropagateEmptyRelation::new()),
258            // Must be after PropagateEmptyRelation
259            Arc::new(EliminateOneUnion::new()),
260            Arc::new(FilterNullJoinKeys::default()),
261            Arc::new(EliminateOuterJoin::new()),
262            // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
263            Arc::new(PushDownLimit::new()),
264            Arc::new(PushDownFilter::new()),
265            Arc::new(SingleDistinctToGroupBy::new()),
266            // The previous optimizations added expressions and projections,
267            // that might benefit from the following rules
268            Arc::new(SimplifyExpressions::new()),
269            Arc::new(UnwrapCastInComparison::new()),
270            Arc::new(CommonSubexprEliminate::new()),
271            Arc::new(EliminateGroupByConstant::new()),
272            Arc::new(OptimizeProjections::new()),
273        ];
274
275        Self::with_rules(rules)
276    }
277
278    /// Create a new optimizer with the given rules
279    pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
280        Self { rules }
281    }
282}
283
284/// Recursively rewrites LogicalPlans
285struct Rewriter<'a> {
286    apply_order: ApplyOrder,
287    rule: &'a dyn OptimizerRule,
288    config: &'a dyn OptimizerConfig,
289}
290
291impl<'a> Rewriter<'a> {
292    fn new(
293        apply_order: ApplyOrder,
294        rule: &'a dyn OptimizerRule,
295        config: &'a dyn OptimizerConfig,
296    ) -> Self {
297        Self {
298            apply_order,
299            rule,
300            config,
301        }
302    }
303}
304
305impl TreeNodeRewriter for Rewriter<'_> {
306    type Node = LogicalPlan;
307
308    fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
309        if self.apply_order == ApplyOrder::TopDown {
310            optimize_plan_node(node, self.rule, self.config)
311        } else {
312            Ok(Transformed::no(node))
313        }
314    }
315
316    fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
317        if self.apply_order == ApplyOrder::BottomUp {
318            optimize_plan_node(node, self.rule, self.config)
319        } else {
320            Ok(Transformed::no(node))
321        }
322    }
323}
324
325/// Invokes the Optimizer rule to rewrite the LogicalPlan in place.
326fn optimize_plan_node(
327    plan: LogicalPlan,
328    rule: &dyn OptimizerRule,
329    config: &dyn OptimizerConfig,
330) -> Result<Transformed<LogicalPlan>> {
331    if rule.supports_rewrite() {
332        return rule.rewrite(plan, config);
333    }
334
335    #[allow(deprecated)]
336    rule.try_optimize(&plan, config).map(|maybe_plan| {
337        match maybe_plan {
338            Some(new_plan) => {
339                // if the node was rewritten by the optimizer, replace the node
340                Transformed::yes(new_plan)
341            }
342            None => Transformed::no(plan),
343        }
344    })
345}
346
347impl Optimizer {
348    /// Optimizes the logical plan by applying optimizer rules, and
349    /// invoking observer function after each call
350    pub fn optimize<F>(
351        &self,
352        plan: LogicalPlan,
353        config: &dyn OptimizerConfig,
354        mut observer: F,
355    ) -> Result<LogicalPlan>
356    where
357        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
358    {
359        // verify LP is valid, before the first LP optimizer pass.
360        plan.check_invariants(InvariantLevel::Executable)
361            .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
362
363        let start_time = Instant::now();
364        let options = config.options();
365        let mut new_plan = plan;
366
367        let mut previous_plans = HashSet::with_capacity(16);
368        previous_plans.insert(LogicalPlanSignature::new(&new_plan));
369
370        let starting_schema = Arc::clone(new_plan.schema());
371
372        let mut i = 0;
373        while i < options.optimizer.max_passes {
374            log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
375
376            for rule in &self.rules {
377                // If skipping failed rules, copy plan before attempting to rewrite
378                // as rewriting is destructive
379                let prev_plan = options
380                    .optimizer
381                    .skip_failed_rules
382                    .then(|| new_plan.clone());
383
384                let starting_schema = Arc::clone(new_plan.schema());
385
386                let result = match rule.apply_order() {
387                    // optimizer handles recursion
388                    Some(apply_order) => new_plan.rewrite_with_subqueries(
389                        &mut Rewriter::new(apply_order, rule.as_ref(), config),
390                    ),
391                    // rule handles recursion itself
392                    None => optimize_plan_node(new_plan, rule.as_ref(), config),
393                }
394                .and_then(|tnr| {
395                    // run checks optimizer invariant checks, per optimizer rule applied
396                    assert_valid_optimization(&tnr.data, &starting_schema)
397                        .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
398
399                    // run LP invariant checks only in debug mode for performance reasons
400                    #[cfg(debug_assertions)]
401                    tnr.data.check_invariants(InvariantLevel::Executable)
402                        .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
403
404                    Ok(tnr)
405                });
406
407                // Handle results
408                match (result, prev_plan) {
409                    // OptimizerRule was successful
410                    (
411                        Ok(Transformed {
412                            data, transformed, ..
413                        }),
414                        _,
415                    ) => {
416                        new_plan = data;
417                        observer(&new_plan, rule.as_ref());
418                        if transformed {
419                            log_plan(rule.name(), &new_plan);
420                        } else {
421                            debug!(
422                                "Plan unchanged by optimizer rule '{}' (pass {})",
423                                rule.name(),
424                                i
425                            );
426                        }
427                    }
428                    // OptimizerRule was unsuccessful, but skipped failed rules is on
429                    // so use the previous plan
430                    (Err(e), Some(orig_plan)) => {
431                        // Note to future readers: if you see this warning it signals a
432                        // bug in the DataFusion optimizer. Please consider filing a ticket
433                        // https://github.com/apache/datafusion
434                        warn!(
435                            "Skipping optimizer rule '{}' due to unexpected error: {}",
436                            rule.name(),
437                            e
438                        );
439                        new_plan = orig_plan;
440                    }
441                    // OptimizerRule was unsuccessful, but skipped failed rules is off, return error
442                    (Err(e), None) => {
443                        return Err(e.context(format!(
444                            "Optimizer rule '{}' failed",
445                            rule.name()
446                        )));
447                    }
448                }
449            }
450            log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
451
452            // HashSet::insert returns, whether the value was newly inserted.
453            let plan_is_fresh =
454                previous_plans.insert(LogicalPlanSignature::new(&new_plan));
455            if !plan_is_fresh {
456                // plan did not change, so no need to continue trying to optimize
457                debug!("optimizer pass {} did not make changes", i);
458                break;
459            }
460            i += 1;
461        }
462
463        // verify that the optimizer passes only mutated what was permitted.
464        assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
465            e.context("Check optimizer-specific invariants after all passes")
466        })?;
467
468        // verify LP is valid, after the last optimizer pass.
469        new_plan
470            .check_invariants(InvariantLevel::Executable)
471            .map_err(|e| {
472                e.context("Invalid (non-executable) plan after LP Optimizers")
473            })?;
474
475        log_plan("Final optimized plan", &new_plan);
476        debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
477        Ok(new_plan)
478    }
479}
480
481/// These are invariants which should hold true before and after [`LogicalPlan`] optimization.
482///
483/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular
484/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes.
485fn assert_valid_optimization(
486    plan: &LogicalPlan,
487    prev_schema: &Arc<DFSchema>,
488) -> Result<()> {
489    // verify invariant: optimizer passes should not change the schema
490    // Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
491    assert_expected_schema(prev_schema, plan)?;
492
493    Ok(())
494}
495
496#[cfg(test)]
497mod tests {
498    use std::sync::{Arc, Mutex};
499
500    use datafusion_common::tree_node::Transformed;
501    use datafusion_common::{plan_err, DFSchema, DFSchemaRef, DataFusionError, Result};
502    use datafusion_expr::logical_plan::EmptyRelation;
503    use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
504
505    use crate::optimizer::Optimizer;
506    use crate::test::test_table_scan;
507    use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
508
509    use super::ApplyOrder;
510
511    #[test]
512    fn skip_failing_rule() {
513        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
514        let config = OptimizerContext::new().with_skip_failing_rules(true);
515        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
516            produce_one_row: false,
517            schema: Arc::new(DFSchema::empty()),
518        });
519        opt.optimize(plan, &config, &observe).unwrap();
520    }
521
522    #[test]
523    fn no_skip_failing_rule() {
524        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
525        let config = OptimizerContext::new().with_skip_failing_rules(false);
526        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
527            produce_one_row: false,
528            schema: Arc::new(DFSchema::empty()),
529        });
530        let err = opt.optimize(plan, &config, &observe).unwrap_err();
531        assert_eq!(
532            "Optimizer rule 'bad rule' failed\ncaused by\n\
533            Error during planning: rule failed",
534            err.strip_backtrace()
535        );
536    }
537
538    #[test]
539    fn generate_different_schema() {
540        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
541        let config = OptimizerContext::new().with_skip_failing_rules(false);
542        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
543            produce_one_row: false,
544            schema: Arc::new(DFSchema::empty()),
545        });
546        let err = opt.optimize(plan, &config, &observe).unwrap_err();
547        assert!(err.strip_backtrace().starts_with(
548            "Optimizer rule 'get table_scan rule' failed\n\
549            caused by\n\
550            Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\
551            caused by\n\
552            Internal error: Failed due to a difference in schemas, \
553            original schema: DFSchema { inner: Schema { \
554            fields: [], \
555            metadata: {} }, \
556            field_qualifiers: [], \
557            functional_dependencies: FunctionalDependencies { deps: [] } \
558            }, \
559            new schema: DFSchema { inner: Schema { \
560            fields: [\
561              Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
562              Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
563              Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }\
564            ], \
565            metadata: {} }, \
566            field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \
567            functional_dependencies: FunctionalDependencies { deps: [] } }",
568        ));
569    }
570
571    #[test]
572    fn skip_generate_different_schema() {
573        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
574        let config = OptimizerContext::new().with_skip_failing_rules(true);
575        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
576            produce_one_row: false,
577            schema: Arc::new(DFSchema::empty()),
578        });
579        opt.optimize(plan, &config, &observe).unwrap();
580    }
581
582    #[test]
583    fn generate_same_schema_different_metadata() -> Result<()> {
584        // if the plan creates more metadata than previously (because
585        // some wrapping functions are removed, etc) do not error
586        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
587        let config = OptimizerContext::new().with_skip_failing_rules(false);
588
589        let input = Arc::new(test_table_scan()?);
590        let input_schema = Arc::clone(input.schema());
591
592        let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
593            vec![col("a"), col("b"), col("c")],
594            input,
595            add_metadata_to_fields(input_schema.as_ref()),
596        )?);
597
598        // optimizing should be ok, but the schema will have changed  (no metadata)
599        assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
600        let optimized_plan = opt.optimize(plan, &config, &observe)?;
601        // metadata was removed
602        assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
603        Ok(())
604    }
605
606    #[test]
607    fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
608        // Run a goofy optimizer, which rotates projection columns
609        // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
610
611        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
612        let config = OptimizerContext::new().with_max_passes(16);
613
614        let initial_plan = LogicalPlanBuilder::empty(false)
615            .project([lit(1), lit(2), lit(3)])?
616            .project([lit(100)])? // to not trigger changed schema error
617            .build()?;
618
619        let mut plans: Vec<LogicalPlan> = Vec::new();
620        let final_plan =
621            opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
622
623        // initial_plan is not observed, so we have 3 plans
624        assert_eq!(3, plans.len());
625
626        // we got again the initial_plan with [1, 2, 3]
627        assert_eq!(initial_plan, final_plan);
628
629        Ok(())
630    }
631
632    #[test]
633    fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
634        // Run a goofy optimizer, which reverses and rotates projection columns
635        // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
636
637        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
638        let config = OptimizerContext::new().with_max_passes(16);
639
640        let initial_plan = LogicalPlanBuilder::empty(false)
641            .project([lit(1), lit(2), lit(3)])?
642            .project([lit(100)])? // to not trigger changed schema error
643            .build()?;
644
645        let mut plans: Vec<LogicalPlan> = Vec::new();
646        let final_plan =
647            opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
648
649        // initial_plan is not observed, so we have 4 plans
650        assert_eq!(4, plans.len());
651
652        // we got again the plan with [3, 2, 1]
653        assert_eq!(plans[0], final_plan);
654
655        Ok(())
656    }
657
658    fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
659        let new_fields = schema
660            .iter()
661            .enumerate()
662            .map(|(i, (qualifier, field))| {
663                let metadata =
664                    [("key".into(), format!("value {i}"))].into_iter().collect();
665
666                let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
667                (qualifier.cloned(), Arc::new(new_arrow_field))
668            })
669            .collect::<Vec<_>>();
670
671        let new_metadata = schema.metadata().clone();
672        Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
673    }
674
675    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
676
677    #[derive(Default, Debug)]
678    struct BadRule {}
679
680    impl OptimizerRule for BadRule {
681        fn name(&self) -> &str {
682            "bad rule"
683        }
684
685        fn supports_rewrite(&self) -> bool {
686            true
687        }
688
689        fn rewrite(
690            &self,
691            _plan: LogicalPlan,
692            _config: &dyn OptimizerConfig,
693        ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
694            plan_err!("rule failed")
695        }
696    }
697
698    /// Replaces whatever plan with a single table scan
699    #[derive(Default, Debug)]
700    struct GetTableScanRule {}
701
702    impl OptimizerRule for GetTableScanRule {
703        fn name(&self) -> &str {
704            "get table_scan rule"
705        }
706
707        fn supports_rewrite(&self) -> bool {
708            true
709        }
710
711        fn rewrite(
712            &self,
713            _plan: LogicalPlan,
714            _config: &dyn OptimizerConfig,
715        ) -> Result<Transformed<LogicalPlan>> {
716            let table_scan = test_table_scan()?;
717            Ok(Transformed::yes(
718                LogicalPlanBuilder::from(table_scan).build()?,
719            ))
720        }
721    }
722
723    /// A goofy rule doing rotation of columns in all projections.
724    ///
725    /// Useful to test cycle detection.
726    #[derive(Default, Debug)]
727    struct RotateProjectionRule {
728        // reverse exprs instead of rotating on the first pass
729        reverse_on_first_pass: Mutex<bool>,
730    }
731
732    impl RotateProjectionRule {
733        fn new(reverse_on_first_pass: bool) -> Self {
734            Self {
735                reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
736            }
737        }
738    }
739
740    impl OptimizerRule for RotateProjectionRule {
741        fn name(&self) -> &str {
742            "rotate_projection"
743        }
744
745        fn apply_order(&self) -> Option<ApplyOrder> {
746            Some(ApplyOrder::TopDown)
747        }
748
749        fn supports_rewrite(&self) -> bool {
750            true
751        }
752
753        fn rewrite(
754            &self,
755            plan: LogicalPlan,
756            _config: &dyn OptimizerConfig,
757        ) -> Result<Transformed<LogicalPlan>> {
758            let projection = match plan {
759                LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
760                _ => return Ok(Transformed::no(plan)),
761            };
762
763            let mut exprs = projection.expr.clone();
764
765            let mut reverse = self.reverse_on_first_pass.lock().unwrap();
766            if *reverse {
767                exprs.reverse();
768                *reverse = false;
769            } else {
770                exprs.rotate_left(1);
771            }
772
773            Ok(Transformed::yes(LogicalPlan::Projection(
774                Projection::try_new(exprs, Arc::clone(&projection.input))?,
775            )))
776        }
777    }
778}