datafusion_optimizer/analyzer/
inline_table_scan.rs1use crate::analyzer::AnalyzerRule;
22
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25use datafusion_common::{Column, Result};
26use datafusion_expr::{logical_plan::LogicalPlan, wildcard, Expr, LogicalPlanBuilder};
27
28#[derive(Default, Debug)]
31pub struct InlineTableScan;
32
33impl InlineTableScan {
34 pub fn new() -> Self {
35 Self {}
36 }
37}
38
39impl AnalyzerRule for InlineTableScan {
40 fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
41 plan.transform_up(analyze_internal).data()
42 }
43
44 fn name(&self) -> &str {
45 "inline_table_scan"
46 }
47}
48
49fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
50 let transformed_plan =
52 plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
53
54 let transformed_plan = transformed_plan.transform_data(|plan| {
55 match plan {
56 LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => {
60 if let Some(sub_plan) = table_scan.source.get_logical_plan() {
61 let sub_plan = sub_plan.into_owned();
62 let projection_exprs =
63 generate_projection_expr(&table_scan.projection, &sub_plan)?;
64 LogicalPlanBuilder::from(sub_plan)
65 .project(projection_exprs)?
66 .alias(table_scan.table_name)?
70 .build()
71 .map(Transformed::yes)
72 } else {
73 Ok(Transformed::no(LogicalPlan::TableScan(table_scan)))
74 }
75 }
76 _ => Ok(Transformed::no(plan)),
77 }
78 })?;
79
80 Ok(transformed_plan)
81}
82
83fn generate_projection_expr(
84 projection: &Option<Vec<usize>>,
85 sub_plan: &LogicalPlan,
86) -> Result<Vec<Expr>> {
87 let mut exprs = vec![];
88 if let Some(projection) = projection {
89 for i in projection {
90 exprs.push(Expr::Column(Column::from(
91 sub_plan.schema().qualified_field(*i),
92 )));
93 }
94 } else {
95 exprs.push(wildcard());
96 }
97 Ok(exprs)
98}
99
100#[cfg(test)]
101mod tests {
102 use std::{borrow::Cow, sync::Arc, vec};
103
104 use crate::analyzer::inline_table_scan::InlineTableScan;
105 use crate::test::assert_analyzed_plan_eq;
106
107 use arrow::datatypes::{DataType, Field, Schema};
108 use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder, TableSource};
109
110 pub struct RawTableSource {}
111
112 impl TableSource for RawTableSource {
113 fn as_any(&self) -> &dyn std::any::Any {
114 self
115 }
116
117 fn schema(&self) -> arrow::datatypes::SchemaRef {
118 Arc::new(Schema::new(vec![
119 Field::new("a", DataType::Int64, false),
120 Field::new("b", DataType::Int64, false),
121 ]))
122 }
123
124 fn supports_filters_pushdown(
125 &self,
126 filters: &[&Expr],
127 ) -> datafusion_common::Result<Vec<datafusion_expr::TableProviderFilterPushDown>>
128 {
129 Ok((0..filters.len())
130 .map(|_| datafusion_expr::TableProviderFilterPushDown::Inexact)
131 .collect())
132 }
133 }
134
135 pub struct CustomSource {
136 plan: LogicalPlan,
137 }
138
139 impl CustomSource {
140 fn new() -> Self {
141 Self {
142 plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
143 .unwrap()
144 .build()
145 .unwrap(),
146 }
147 }
148 }
149
150 impl TableSource for CustomSource {
151 fn as_any(&self) -> &dyn std::any::Any {
152 self
153 }
154
155 fn supports_filters_pushdown(
156 &self,
157 filters: &[&Expr],
158 ) -> datafusion_common::Result<Vec<datafusion_expr::TableProviderFilterPushDown>>
159 {
160 Ok((0..filters.len())
161 .map(|_| datafusion_expr::TableProviderFilterPushDown::Exact)
162 .collect())
163 }
164
165 fn schema(&self) -> arrow::datatypes::SchemaRef {
166 Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
167 }
168
169 fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
170 Some(Cow::Borrowed(&self.plan))
171 }
172 }
173
174 #[test]
175 fn inline_table_scan() -> datafusion_common::Result<()> {
176 let scan = LogicalPlanBuilder::scan(
177 "x".to_string(),
178 Arc::new(CustomSource::new()),
179 None,
180 )?;
181 let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
182 let expected = "Filter: x.a = Int32(1)\
183 \n SubqueryAlias: x\
184 \n Projection: *\
185 \n TableScan: y";
186
187 assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
188 }
189
190 #[test]
191 fn inline_table_scan_with_projection() -> datafusion_common::Result<()> {
192 let scan = LogicalPlanBuilder::scan(
193 "x".to_string(),
194 Arc::new(CustomSource::new()),
195 Some(vec![0]),
196 )?;
197
198 let plan = scan.build()?;
199 let expected = "SubqueryAlias: x\
200 \n Projection: y.a\
201 \n TableScan: y";
202
203 assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
204 }
205}