datafusion_optimizer/analyzer/
inline_table_scan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Analyzed rule to replace TableScan references
19//! such as DataFrames and Views and inlines the LogicalPlan.
20
21use crate::analyzer::AnalyzerRule;
22
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25use datafusion_common::{Column, Result};
26use datafusion_expr::{logical_plan::LogicalPlan, wildcard, Expr, LogicalPlanBuilder};
27
28/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
29/// (DataFrame / ViewTable)
30#[derive(Default, Debug)]
31pub struct InlineTableScan;
32
33impl InlineTableScan {
34    pub fn new() -> Self {
35        Self {}
36    }
37}
38
39impl AnalyzerRule for InlineTableScan {
40    fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
41        plan.transform_up(analyze_internal).data()
42    }
43
44    fn name(&self) -> &str {
45        "inline_table_scan"
46    }
47}
48
49fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
50    // rewrite any subqueries in the plan first
51    let transformed_plan =
52        plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
53
54    let transformed_plan = transformed_plan.transform_data(|plan| {
55        match plan {
56            // Match only on scans without filter / projection / fetch
57            // Views and DataFrames won't have those added
58            // during the early stage of planning.
59            LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => {
60                if let Some(sub_plan) = table_scan.source.get_logical_plan() {
61                    let sub_plan = sub_plan.into_owned();
62                    let projection_exprs =
63                        generate_projection_expr(&table_scan.projection, &sub_plan)?;
64                    LogicalPlanBuilder::from(sub_plan)
65                        .project(projection_exprs)?
66                        // Ensures that the reference to the inlined table remains the
67                        // same, meaning we don't have to change any of the parent nodes
68                        // that reference this table.
69                        .alias(table_scan.table_name)?
70                        .build()
71                        .map(Transformed::yes)
72                } else {
73                    Ok(Transformed::no(LogicalPlan::TableScan(table_scan)))
74                }
75            }
76            _ => Ok(Transformed::no(plan)),
77        }
78    })?;
79
80    Ok(transformed_plan)
81}
82
83fn generate_projection_expr(
84    projection: &Option<Vec<usize>>,
85    sub_plan: &LogicalPlan,
86) -> Result<Vec<Expr>> {
87    let mut exprs = vec![];
88    if let Some(projection) = projection {
89        for i in projection {
90            exprs.push(Expr::Column(Column::from(
91                sub_plan.schema().qualified_field(*i),
92            )));
93        }
94    } else {
95        exprs.push(wildcard());
96    }
97    Ok(exprs)
98}
99
100#[cfg(test)]
101mod tests {
102    use std::{borrow::Cow, sync::Arc, vec};
103
104    use crate::analyzer::inline_table_scan::InlineTableScan;
105    use crate::test::assert_analyzed_plan_eq;
106
107    use arrow::datatypes::{DataType, Field, Schema};
108    use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder, TableSource};
109
110    pub struct RawTableSource {}
111
112    impl TableSource for RawTableSource {
113        fn as_any(&self) -> &dyn std::any::Any {
114            self
115        }
116
117        fn schema(&self) -> arrow::datatypes::SchemaRef {
118            Arc::new(Schema::new(vec![
119                Field::new("a", DataType::Int64, false),
120                Field::new("b", DataType::Int64, false),
121            ]))
122        }
123
124        fn supports_filters_pushdown(
125            &self,
126            filters: &[&Expr],
127        ) -> datafusion_common::Result<Vec<datafusion_expr::TableProviderFilterPushDown>>
128        {
129            Ok((0..filters.len())
130                .map(|_| datafusion_expr::TableProviderFilterPushDown::Inexact)
131                .collect())
132        }
133    }
134
135    pub struct CustomSource {
136        plan: LogicalPlan,
137    }
138
139    impl CustomSource {
140        fn new() -> Self {
141            Self {
142                plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
143                    .unwrap()
144                    .build()
145                    .unwrap(),
146            }
147        }
148    }
149
150    impl TableSource for CustomSource {
151        fn as_any(&self) -> &dyn std::any::Any {
152            self
153        }
154
155        fn supports_filters_pushdown(
156            &self,
157            filters: &[&Expr],
158        ) -> datafusion_common::Result<Vec<datafusion_expr::TableProviderFilterPushDown>>
159        {
160            Ok((0..filters.len())
161                .map(|_| datafusion_expr::TableProviderFilterPushDown::Exact)
162                .collect())
163        }
164
165        fn schema(&self) -> arrow::datatypes::SchemaRef {
166            Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
167        }
168
169        fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
170            Some(Cow::Borrowed(&self.plan))
171        }
172    }
173
174    #[test]
175    fn inline_table_scan() -> datafusion_common::Result<()> {
176        let scan = LogicalPlanBuilder::scan(
177            "x".to_string(),
178            Arc::new(CustomSource::new()),
179            None,
180        )?;
181        let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
182        let expected = "Filter: x.a = Int32(1)\
183        \n  SubqueryAlias: x\
184        \n    Projection: *\
185        \n      TableScan: y";
186
187        assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
188    }
189
190    #[test]
191    fn inline_table_scan_with_projection() -> datafusion_common::Result<()> {
192        let scan = LogicalPlanBuilder::scan(
193            "x".to_string(),
194            Arc::new(CustomSource::new()),
195            Some(vec![0]),
196        )?;
197
198        let plan = scan.build()?;
199        let expected = "SubqueryAlias: x\
200        \n  Projection: y.a\
201        \n    TableScan: y";
202
203        assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
204    }
205}