1use std::sync::Arc;
19
20use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
21
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::{
24 not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, TableReference,
25};
26use datafusion_expr::builder::subquery_alias;
27use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
28use datafusion_expr::{Subquery, SubqueryAlias};
29use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};
30
31mod join;
32
33impl<S: ContextProvider> SqlToRel<'_, S> {
34 fn create_relation(
36 &self,
37 relation: TableFactor,
38 planner_context: &mut PlannerContext,
39 ) -> Result<LogicalPlan> {
40 let relation_span = relation.span();
41 let (plan, alias) = match relation {
42 TableFactor::Table {
43 name, alias, args, ..
44 } => {
45 if let Some(func_args) = args {
46 let tbl_func_name =
47 name.0.first().unwrap().as_ident().unwrap().to_string();
48 let args = func_args
49 .args
50 .into_iter()
51 .flat_map(|arg| {
52 if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg
53 {
54 self.sql_expr_to_logical_expr(
55 expr,
56 &DFSchema::empty(),
57 planner_context,
58 )
59 } else {
60 plan_err!("Unsupported function argument type: {:?}", arg)
61 }
62 })
63 .collect::<Vec<_>>();
64 let provider = self
65 .context_provider
66 .get_table_function_source(&tbl_func_name, args)?;
67 let plan = LogicalPlanBuilder::scan(
68 TableReference::Bare {
69 table: "tmp_table".into(),
70 },
71 provider,
72 None,
73 )?
74 .build()?;
75 (plan, alias)
76 } else {
77 let table_ref = self.object_name_to_table_reference(name)?;
79 let table_name = table_ref.to_string();
80 let cte = planner_context.get_cte(&table_name);
81 (
82 match (
83 cte,
84 self.context_provider.get_table_source(table_ref.clone()),
85 ) {
86 (Some(cte_plan), _) => Ok(cte_plan.clone()),
87 (_, Ok(provider)) => LogicalPlanBuilder::scan(
88 table_ref.clone(),
89 provider,
90 None,
91 )?
92 .build(),
93 (None, Err(e)) => {
94 let e = e.with_diagnostic(Diagnostic::new_error(
95 format!("table '{table_ref}' not found"),
96 Span::try_from_sqlparser_span(relation_span),
97 ));
98 Err(e)
99 }
100 }?,
101 alias,
102 )
103 }
104 }
105 TableFactor::Derived {
106 subquery, alias, ..
107 } => {
108 let logical_plan = self.query_to_plan(*subquery, planner_context)?;
109 (logical_plan, alias)
110 }
111 TableFactor::NestedJoin {
112 table_with_joins,
113 alias,
114 } => (
115 self.plan_table_with_joins(*table_with_joins, planner_context)?,
116 alias,
117 ),
118 TableFactor::UNNEST {
119 alias,
120 array_exprs,
121 with_offset: false,
122 with_offset_alias: None,
123 with_ordinality,
124 } => {
125 if with_ordinality {
126 return not_impl_err!("UNNEST with ordinality is not supported yet");
127 }
128
129 let schema = DFSchema::empty();
131 let input = LogicalPlanBuilder::empty(true).build()?;
132 let unnest_exprs = array_exprs
135 .into_iter()
136 .map(|sql_expr| {
137 let expr = self.sql_expr_to_logical_expr(
138 sql_expr,
139 &schema,
140 planner_context,
141 )?;
142 Self::check_unnest_arg(&expr, &schema)?;
143 Ok(Expr::Unnest(Unnest::new(expr)))
144 })
145 .collect::<Result<Vec<_>>>()?;
146 if unnest_exprs.is_empty() {
147 return plan_err!("UNNEST must have at least one argument");
148 }
149 let logical_plan = self.try_process_unnest(input, unnest_exprs)?;
150 (logical_plan, alias)
151 }
152 TableFactor::UNNEST { .. } => {
153 return not_impl_err!(
154 "UNNEST table factor with offset is not supported yet"
155 );
156 }
157 _ => {
159 return not_impl_err!(
160 "Unsupported ast node {relation:?} in create_relation"
161 );
162 }
163 };
164
165 let optimized_plan = optimize_subquery_sort(plan)?.data;
166 if let Some(alias) = alias {
167 self.apply_table_alias(optimized_plan, alias)
168 } else {
169 Ok(optimized_plan)
170 }
171 }
172
173 pub(crate) fn create_relation_subquery(
174 &self,
175 subquery: TableFactor,
176 planner_context: &mut PlannerContext,
177 ) -> Result<LogicalPlan> {
178 let old_from_schema = planner_context
185 .set_outer_from_schema(None)
186 .unwrap_or_else(|| Arc::new(DFSchema::empty()));
187 let new_query_schema = match planner_context.outer_query_schema() {
188 Some(old_query_schema) => {
189 let mut new_query_schema = old_from_schema.as_ref().clone();
190 new_query_schema.merge(old_query_schema);
191 Some(Arc::new(new_query_schema))
192 }
193 None => Some(Arc::clone(&old_from_schema)),
194 };
195 let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
196
197 let plan = self.create_relation(subquery, planner_context)?;
198 let outer_ref_columns = plan.all_out_ref_exprs();
199
200 planner_context.set_outer_query_schema(old_query_schema);
201 planner_context.set_outer_from_schema(Some(old_from_schema));
202
203 if outer_ref_columns.is_empty() {
206 return Ok(plan);
207 }
208
209 match plan {
210 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
211 subquery_alias(
212 LogicalPlan::Subquery(Subquery {
213 subquery: input,
214 outer_ref_columns,
215 spans: Spans::new(),
216 }),
217 alias,
218 )
219 }
220 plan => Ok(LogicalPlan::Subquery(Subquery {
221 subquery: Arc::new(plan),
222 outer_ref_columns,
223 spans: Spans::new(),
224 })),
225 }
226 }
227}
228
229fn optimize_subquery_sort(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
230 let mut has_limit = false;
237 let new_plan = plan.transform_down(|c| {
238 if let LogicalPlan::Limit(_) = c {
239 has_limit = true;
240 return Ok(Transformed::no(c));
241 }
242 match c {
243 LogicalPlan::Sort(s) => {
244 if !has_limit {
245 has_limit = false;
246 return Ok(Transformed::yes(s.input.as_ref().clone()));
247 }
248 Ok(Transformed::no(LogicalPlan::Sort(s)))
249 }
250 _ => Ok(Transformed::no(c)),
251 }
252 });
253 new_plan
254}