datafusion_optimizer/
optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Optimizer`] and [`OptimizerRule`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
37use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
38use crate::eliminate_cross_join::EliminateCrossJoin;
39use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
40use crate::eliminate_filter::EliminateFilter;
41use crate::eliminate_group_by_constant::EliminateGroupByConstant;
42use crate::eliminate_join::EliminateJoin;
43use crate::eliminate_limit::EliminateLimit;
44use crate::eliminate_nested_union::EliminateNestedUnion;
45use crate::eliminate_one_union::EliminateOneUnion;
46use crate::eliminate_outer_join::EliminateOuterJoin;
47use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
48use crate::filter_null_join_keys::FilterNullJoinKeys;
49use crate::optimize_projections::OptimizeProjections;
50use crate::plan_signature::LogicalPlanSignature;
51use crate::propagate_empty_relation::PropagateEmptyRelation;
52use crate::push_down_filter::PushDownFilter;
53use crate::push_down_limit::PushDownLimit;
54use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
56use crate::simplify_expressions::SimplifyExpressions;
57use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
58use crate::utils::log_plan;
59
60/// `OptimizerRule`s transforms one [`LogicalPlan`] into another which
61/// computes the same results, but in a potentially more efficient
62/// way. If there are no suitable transformations for the input plan,
63/// the optimizer should simply return it unmodified.
64///
65/// To change the semantics of a `LogicalPlan`, see [`AnalyzerRule`]
66///
67/// Use [`SessionState::add_optimizer_rule`] to register additional
68/// `OptimizerRule`s.
69///
70/// [`AnalyzerRule`]: crate::analyzer::AnalyzerRule
71/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_optimizer_rule
72pub trait OptimizerRule: Debug {
73    /// A human readable name for this optimizer rule
74    fn name(&self) -> &str;
75
76    /// How should the rule be applied by the optimizer? See comments on
77    /// [`ApplyOrder`] for details.
78    ///
79    /// If returns `None`, the default, the rule must handle recursion itself
80    fn apply_order(&self) -> Option<ApplyOrder> {
81        None
82    }
83
84    /// Does this rule support rewriting owned plans (rather than by reference)?
85    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
86    fn supports_rewrite(&self) -> bool {
87        true
88    }
89
90    /// Try to rewrite `plan` to an optimized form, returning `Transformed::yes`
91    /// if the plan was rewritten and `Transformed::no` if it was not.
92    fn rewrite(
93        &self,
94        _plan: LogicalPlan,
95        _config: &dyn OptimizerConfig,
96    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
97        internal_err!("rewrite is not implemented for {}", self.name())
98    }
99}
100
101/// Options to control the DataFusion Optimizer.
102pub trait OptimizerConfig {
103    /// Return the time at which the query execution started. This
104    /// time is used as the value for now()
105    fn query_execution_start_time(&self) -> DateTime<Utc>;
106
107    /// Return alias generator used to generate unique aliases for subqueries
108    fn alias_generator(&self) -> &Arc<AliasGenerator>;
109
110    fn options(&self) -> Arc<ConfigOptions>;
111
112    fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
113        None
114    }
115}
116
117/// A standalone [`OptimizerConfig`] that can be used independently
118/// of DataFusion's config management
119#[derive(Debug)]
120pub struct OptimizerContext {
121    /// Query execution start time that can be used to rewrite
122    /// expressions such as `now()` to use a literal value instead
123    query_execution_start_time: DateTime<Utc>,
124
125    /// Alias generator used to generate unique aliases for subqueries
126    alias_generator: Arc<AliasGenerator>,
127
128    options: Arc<ConfigOptions>,
129}
130
131impl OptimizerContext {
132    /// Create optimizer config
133    pub fn new() -> Self {
134        let mut options = ConfigOptions::default();
135        options.optimizer.filter_null_join_keys = true;
136
137        Self::new_with_config_options(Arc::new(options))
138    }
139
140    /// Create a optimizer config with provided [ConfigOptions].
141    pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
142        Self {
143            query_execution_start_time: Utc::now(),
144            alias_generator: Arc::new(AliasGenerator::new()),
145            options,
146        }
147    }
148
149    /// Specify whether to enable the filter_null_keys rule
150    pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
151        Arc::make_mut(&mut self.options)
152            .optimizer
153            .filter_null_join_keys = filter_null_keys;
154        self
155    }
156
157    /// Specify whether the optimizer should skip rules that produce
158    /// errors, or fail the query
159    pub fn with_query_execution_start_time(
160        mut self,
161        query_execution_tart_time: DateTime<Utc>,
162    ) -> Self {
163        self.query_execution_start_time = query_execution_tart_time;
164        self
165    }
166
167    /// Specify whether the optimizer should skip rules that produce
168    /// errors, or fail the query
169    pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
170        Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
171        self
172    }
173
174    /// Specify how many times to attempt to optimize the plan
175    pub fn with_max_passes(mut self, v: u8) -> Self {
176        Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
177        self
178    }
179}
180
181impl Default for OptimizerContext {
182    /// Create optimizer config
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188impl OptimizerConfig for OptimizerContext {
189    fn query_execution_start_time(&self) -> DateTime<Utc> {
190        self.query_execution_start_time
191    }
192
193    fn alias_generator(&self) -> &Arc<AliasGenerator> {
194        &self.alias_generator
195    }
196
197    fn options(&self) -> Arc<ConfigOptions> {
198        Arc::clone(&self.options)
199    }
200}
201
202/// A rule-based optimizer.
203#[derive(Clone, Debug)]
204pub struct Optimizer {
205    /// All optimizer rules to apply
206    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
207}
208
209/// Specifies how recursion for an `OptimizerRule` should be handled.
210///
211/// * `Some(apply_order)`: The Optimizer will recursively apply the rule to the plan.
212/// * `None`: the rule must handle any required recursion itself.
213#[derive(Debug, Clone, Copy, PartialEq)]
214pub enum ApplyOrder {
215    /// Apply the rule to the node before its inputs
216    TopDown,
217    /// Apply the rule to the node after its inputs
218    BottomUp,
219}
220
221impl Default for Optimizer {
222    fn default() -> Self {
223        Self::new()
224    }
225}
226
227impl Optimizer {
228    /// Create a new optimizer using the recommended list of rules
229    pub fn new() -> Self {
230        let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
231            Arc::new(EliminateNestedUnion::new()),
232            Arc::new(SimplifyExpressions::new()),
233            Arc::new(ReplaceDistinctWithAggregate::new()),
234            Arc::new(EliminateJoin::new()),
235            Arc::new(DecorrelatePredicateSubquery::new()),
236            Arc::new(ScalarSubqueryToJoin::new()),
237            Arc::new(DecorrelateLateralJoin::new()),
238            Arc::new(ExtractEquijoinPredicate::new()),
239            Arc::new(EliminateDuplicatedExpr::new()),
240            Arc::new(EliminateFilter::new()),
241            Arc::new(EliminateCrossJoin::new()),
242            Arc::new(EliminateLimit::new()),
243            Arc::new(PropagateEmptyRelation::new()),
244            // Must be after PropagateEmptyRelation
245            Arc::new(EliminateOneUnion::new()),
246            Arc::new(FilterNullJoinKeys::default()),
247            Arc::new(EliminateOuterJoin::new()),
248            // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
249            Arc::new(PushDownLimit::new()),
250            Arc::new(PushDownFilter::new()),
251            Arc::new(SingleDistinctToGroupBy::new()),
252            // The previous optimizations added expressions and projections,
253            // that might benefit from the following rules
254            Arc::new(EliminateGroupByConstant::new()),
255            Arc::new(CommonSubexprEliminate::new()),
256            Arc::new(OptimizeProjections::new()),
257        ];
258
259        Self::with_rules(rules)
260    }
261
262    /// Create a new optimizer with the given rules
263    pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
264        Self { rules }
265    }
266}
267
268/// Recursively rewrites LogicalPlans
269struct Rewriter<'a> {
270    apply_order: ApplyOrder,
271    rule: &'a dyn OptimizerRule,
272    config: &'a dyn OptimizerConfig,
273}
274
275impl<'a> Rewriter<'a> {
276    fn new(
277        apply_order: ApplyOrder,
278        rule: &'a dyn OptimizerRule,
279        config: &'a dyn OptimizerConfig,
280    ) -> Self {
281        Self {
282            apply_order,
283            rule,
284            config,
285        }
286    }
287}
288
289impl TreeNodeRewriter for Rewriter<'_> {
290    type Node = LogicalPlan;
291
292    fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
293        if self.apply_order == ApplyOrder::TopDown {
294            {
295                self.rule.rewrite(node, self.config)
296            }
297        } else {
298            Ok(Transformed::no(node))
299        }
300    }
301
302    fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
303        if self.apply_order == ApplyOrder::BottomUp {
304            {
305                self.rule.rewrite(node, self.config)
306            }
307        } else {
308            Ok(Transformed::no(node))
309        }
310    }
311}
312
313impl Optimizer {
314    /// Optimizes the logical plan by applying optimizer rules, and
315    /// invoking observer function after each call
316    pub fn optimize<F>(
317        &self,
318        plan: LogicalPlan,
319        config: &dyn OptimizerConfig,
320        mut observer: F,
321    ) -> Result<LogicalPlan>
322    where
323        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
324    {
325        // verify LP is valid, before the first LP optimizer pass.
326        plan.check_invariants(InvariantLevel::Executable)
327            .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
328
329        let start_time = Instant::now();
330        let options = config.options();
331        let mut new_plan = plan;
332
333        let mut previous_plans = HashSet::with_capacity(16);
334        previous_plans.insert(LogicalPlanSignature::new(&new_plan));
335
336        let starting_schema = Arc::clone(new_plan.schema());
337
338        let mut i = 0;
339        while i < options.optimizer.max_passes {
340            log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
341
342            for rule in &self.rules {
343                // If skipping failed rules, copy plan before attempting to rewrite
344                // as rewriting is destructive
345                let prev_plan = options
346                    .optimizer
347                    .skip_failed_rules
348                    .then(|| new_plan.clone());
349
350                let starting_schema = Arc::clone(new_plan.schema());
351
352                let result = match rule.apply_order() {
353                    // optimizer handles recursion
354                    Some(apply_order) => new_plan.rewrite_with_subqueries(
355                        &mut Rewriter::new(apply_order, rule.as_ref(), config),
356                    ),
357                    // rule handles recursion itself
358                    None => {
359                        rule.rewrite(new_plan, config)
360                    },
361                }
362                .and_then(|tnr| {
363                    // run checks optimizer invariant checks, per optimizer rule applied
364                    assert_valid_optimization(&tnr.data, &starting_schema)
365                        .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
366
367                    // run LP invariant checks only in debug mode for performance reasons
368                    #[cfg(debug_assertions)]
369                    tnr.data.check_invariants(InvariantLevel::Executable)
370                        .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
371
372                    Ok(tnr)
373                });
374
375                // Handle results
376                match (result, prev_plan) {
377                    // OptimizerRule was successful
378                    (
379                        Ok(Transformed {
380                            data, transformed, ..
381                        }),
382                        _,
383                    ) => {
384                        new_plan = data;
385                        observer(&new_plan, rule.as_ref());
386                        if transformed {
387                            log_plan(rule.name(), &new_plan);
388                        } else {
389                            debug!(
390                                "Plan unchanged by optimizer rule '{}' (pass {})",
391                                rule.name(),
392                                i
393                            );
394                        }
395                    }
396                    // OptimizerRule was unsuccessful, but skipped failed rules is on
397                    // so use the previous plan
398                    (Err(e), Some(orig_plan)) => {
399                        // Note to future readers: if you see this warning it signals a
400                        // bug in the DataFusion optimizer. Please consider filing a ticket
401                        // https://github.com/apache/datafusion
402                        warn!(
403                            "Skipping optimizer rule '{}' due to unexpected error: {}",
404                            rule.name(),
405                            e
406                        );
407                        new_plan = orig_plan;
408                    }
409                    // OptimizerRule was unsuccessful, but skipped failed rules is off, return error
410                    (Err(e), None) => {
411                        return Err(e.context(format!(
412                            "Optimizer rule '{}' failed",
413                            rule.name()
414                        )));
415                    }
416                }
417            }
418            log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
419
420            // HashSet::insert returns, whether the value was newly inserted.
421            let plan_is_fresh =
422                previous_plans.insert(LogicalPlanSignature::new(&new_plan));
423            if !plan_is_fresh {
424                // plan did not change, so no need to continue trying to optimize
425                debug!("optimizer pass {i} did not make changes");
426                break;
427            }
428            i += 1;
429        }
430
431        // verify that the optimizer passes only mutated what was permitted.
432        assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
433            e.context("Check optimizer-specific invariants after all passes")
434        })?;
435
436        // verify LP is valid, after the last optimizer pass.
437        new_plan
438            .check_invariants(InvariantLevel::Executable)
439            .map_err(|e| {
440                e.context("Invalid (non-executable) plan after LP Optimizers")
441            })?;
442
443        log_plan("Final optimized plan", &new_plan);
444        debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
445        Ok(new_plan)
446    }
447}
448
449/// These are invariants which should hold true before and after [`LogicalPlan`] optimization.
450///
451/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular
452/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes.
453fn assert_valid_optimization(
454    plan: &LogicalPlan,
455    prev_schema: &Arc<DFSchema>,
456) -> Result<()> {
457    // verify invariant: optimizer passes should not change the schema if the schema can't be cast from the previous schema.
458    // Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
459    assert_expected_schema(prev_schema, plan)?;
460
461    Ok(())
462}
463
464#[cfg(test)]
465mod tests {
466    use std::sync::{Arc, Mutex};
467
468    use datafusion_common::tree_node::Transformed;
469    use datafusion_common::{
470        assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
471    };
472    use datafusion_expr::logical_plan::EmptyRelation;
473    use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
474
475    use crate::optimizer::Optimizer;
476    use crate::test::test_table_scan;
477    use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
478
479    use super::ApplyOrder;
480
481    #[test]
482    fn skip_failing_rule() {
483        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
484        let config = OptimizerContext::new().with_skip_failing_rules(true);
485        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
486            produce_one_row: false,
487            schema: Arc::new(DFSchema::empty()),
488        });
489        opt.optimize(plan, &config, &observe).unwrap();
490    }
491
492    #[test]
493    fn no_skip_failing_rule() {
494        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
495        let config = OptimizerContext::new().with_skip_failing_rules(false);
496        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
497            produce_one_row: false,
498            schema: Arc::new(DFSchema::empty()),
499        });
500        let err = opt.optimize(plan, &config, &observe).unwrap_err();
501        assert_eq!(
502            "Optimizer rule 'bad rule' failed\ncaused by\n\
503            Error during planning: rule failed",
504            err.strip_backtrace()
505        );
506    }
507
508    #[test]
509    fn generate_different_schema() {
510        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
511        let config = OptimizerContext::new().with_skip_failing_rules(false);
512        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
513            produce_one_row: false,
514            schema: Arc::new(DFSchema::empty()),
515        });
516        let err = opt.optimize(plan, &config, &observe).unwrap_err();
517
518        // Simplify assert to check the error message contains the expected message
519        assert_contains!(
520            err.strip_backtrace(),
521            "Failed due to a difference in schemas: original schema: DFSchema"
522        );
523    }
524
525    #[test]
526    fn skip_generate_different_schema() {
527        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
528        let config = OptimizerContext::new().with_skip_failing_rules(true);
529        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
530            produce_one_row: false,
531            schema: Arc::new(DFSchema::empty()),
532        });
533        opt.optimize(plan, &config, &observe).unwrap();
534    }
535
536    #[test]
537    fn generate_same_schema_different_metadata() -> Result<()> {
538        // if the plan creates more metadata than previously (because
539        // some wrapping functions are removed, etc) do not error
540        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
541        let config = OptimizerContext::new().with_skip_failing_rules(false);
542
543        let input = Arc::new(test_table_scan()?);
544        let input_schema = Arc::clone(input.schema());
545
546        let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
547            vec![col("a"), col("b"), col("c")],
548            input,
549            add_metadata_to_fields(input_schema.as_ref()),
550        )?);
551
552        // optimizing should be ok, but the schema will have changed  (no metadata)
553        assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
554        let optimized_plan = opt.optimize(plan, &config, &observe)?;
555        // metadata was removed
556        assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
557        Ok(())
558    }
559
560    #[test]
561    fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
562        // Run a goofy optimizer, which rotates projection columns
563        // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
564
565        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
566        let config = OptimizerContext::new().with_max_passes(16);
567
568        let initial_plan = LogicalPlanBuilder::empty(false)
569            .project([lit(1), lit(2), lit(3)])?
570            .project([lit(100)])? // to not trigger changed schema error
571            .build()?;
572
573        let mut plans: Vec<LogicalPlan> = Vec::new();
574        let final_plan =
575            opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
576
577        // initial_plan is not observed, so we have 3 plans
578        assert_eq!(3, plans.len());
579
580        // we got again the initial_plan with [1, 2, 3]
581        assert_eq!(initial_plan, final_plan);
582
583        Ok(())
584    }
585
586    #[test]
587    fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
588        // Run a goofy optimizer, which reverses and rotates projection columns
589        // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
590
591        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
592        let config = OptimizerContext::new().with_max_passes(16);
593
594        let initial_plan = LogicalPlanBuilder::empty(false)
595            .project([lit(1), lit(2), lit(3)])?
596            .project([lit(100)])? // to not trigger changed schema error
597            .build()?;
598
599        let mut plans: Vec<LogicalPlan> = Vec::new();
600        let final_plan =
601            opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
602
603        // initial_plan is not observed, so we have 4 plans
604        assert_eq!(4, plans.len());
605
606        // we got again the plan with [3, 2, 1]
607        assert_eq!(plans[0], final_plan);
608
609        Ok(())
610    }
611
612    fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
613        let new_fields = schema
614            .iter()
615            .enumerate()
616            .map(|(i, (qualifier, field))| {
617                let metadata =
618                    [("key".into(), format!("value {i}"))].into_iter().collect();
619
620                let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
621                (qualifier.cloned(), Arc::new(new_arrow_field))
622            })
623            .collect::<Vec<_>>();
624
625        let new_metadata = schema.metadata().clone();
626        Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
627    }
628
629    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
630
631    #[derive(Default, Debug)]
632    struct BadRule {}
633
634    impl OptimizerRule for BadRule {
635        fn name(&self) -> &str {
636            "bad rule"
637        }
638
639        fn supports_rewrite(&self) -> bool {
640            true
641        }
642
643        fn rewrite(
644            &self,
645            _plan: LogicalPlan,
646            _config: &dyn OptimizerConfig,
647        ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
648            plan_err!("rule failed")
649        }
650    }
651
652    /// Replaces whatever plan with a single table scan
653    #[derive(Default, Debug)]
654    struct GetTableScanRule {}
655
656    impl OptimizerRule for GetTableScanRule {
657        fn name(&self) -> &str {
658            "get table_scan rule"
659        }
660
661        fn supports_rewrite(&self) -> bool {
662            true
663        }
664
665        fn rewrite(
666            &self,
667            _plan: LogicalPlan,
668            _config: &dyn OptimizerConfig,
669        ) -> Result<Transformed<LogicalPlan>> {
670            let table_scan = test_table_scan()?;
671            Ok(Transformed::yes(
672                LogicalPlanBuilder::from(table_scan).build()?,
673            ))
674        }
675    }
676
677    /// A goofy rule doing rotation of columns in all projections.
678    ///
679    /// Useful to test cycle detection.
680    #[derive(Default, Debug)]
681    struct RotateProjectionRule {
682        // reverse exprs instead of rotating on the first pass
683        reverse_on_first_pass: Mutex<bool>,
684    }
685
686    impl RotateProjectionRule {
687        fn new(reverse_on_first_pass: bool) -> Self {
688            Self {
689                reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
690            }
691        }
692    }
693
694    impl OptimizerRule for RotateProjectionRule {
695        fn name(&self) -> &str {
696            "rotate_projection"
697        }
698
699        fn apply_order(&self) -> Option<ApplyOrder> {
700            Some(ApplyOrder::TopDown)
701        }
702
703        fn supports_rewrite(&self) -> bool {
704            true
705        }
706
707        fn rewrite(
708            &self,
709            plan: LogicalPlan,
710            _config: &dyn OptimizerConfig,
711        ) -> Result<Transformed<LogicalPlan>> {
712            let projection = match plan {
713                LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
714                _ => return Ok(Transformed::no(plan)),
715            };
716
717            let mut exprs = projection.expr.clone();
718
719            let mut reverse = self.reverse_on_first_pass.lock().unwrap();
720            if *reverse {
721                exprs.reverse();
722                *reverse = false;
723            } else {
724                exprs.rotate_left(1);
725            }
726
727            Ok(Transformed::yes(LogicalPlan::Projection(
728                Projection::try_new(exprs, Arc::clone(&projection.input))?,
729            )))
730        }
731    }
732}