datafusion_sql/relation/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
21
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::{
24    not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, TableReference,
25};
26use datafusion_expr::builder::subquery_alias;
27use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
28use datafusion_expr::{Subquery, SubqueryAlias};
29use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};
30
31mod join;
32
33impl<S: ContextProvider> SqlToRel<'_, S> {
34    /// Create a `LogicalPlan` that scans the named relation
35    fn create_relation(
36        &self,
37        relation: TableFactor,
38        planner_context: &mut PlannerContext,
39    ) -> Result<LogicalPlan> {
40        let relation_span = relation.span();
41        let (plan, alias) = match relation {
42            TableFactor::Table {
43                name, alias, args, ..
44            } => {
45                if let Some(func_args) = args {
46                    let tbl_func_name =
47                        name.0.first().unwrap().as_ident().unwrap().to_string();
48                    let args = func_args
49                        .args
50                        .into_iter()
51                        .flat_map(|arg| {
52                            if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg
53                            {
54                                self.sql_expr_to_logical_expr(
55                                    expr,
56                                    &DFSchema::empty(),
57                                    planner_context,
58                                )
59                            } else {
60                                plan_err!("Unsupported function argument type: {:?}", arg)
61                            }
62                        })
63                        .collect::<Vec<_>>();
64                    let provider = self
65                        .context_provider
66                        .get_table_function_source(&tbl_func_name, args)?;
67                    let plan = LogicalPlanBuilder::scan(
68                        TableReference::Bare {
69                            table: "tmp_table".into(),
70                        },
71                        provider,
72                        None,
73                    )?
74                    .build()?;
75                    (plan, alias)
76                } else {
77                    // Normalize name and alias
78                    let table_ref = self.object_name_to_table_reference(name)?;
79                    let table_name = table_ref.to_string();
80                    let cte = planner_context.get_cte(&table_name);
81                    (
82                        match (
83                            cte,
84                            self.context_provider.get_table_source(table_ref.clone()),
85                        ) {
86                            (Some(cte_plan), _) => Ok(cte_plan.clone()),
87                            (_, Ok(provider)) => LogicalPlanBuilder::scan(
88                                table_ref.clone(),
89                                provider,
90                                None,
91                            )?
92                            .build(),
93                            (None, Err(e)) => {
94                                let e = e.with_diagnostic(Diagnostic::new_error(
95                                    format!("table '{table_ref}' not found"),
96                                    Span::try_from_sqlparser_span(relation_span),
97                                ));
98                                Err(e)
99                            }
100                        }?,
101                        alias,
102                    )
103                }
104            }
105            TableFactor::Derived {
106                subquery, alias, ..
107            } => {
108                let logical_plan = self.query_to_plan(*subquery, planner_context)?;
109                (logical_plan, alias)
110            }
111            TableFactor::NestedJoin {
112                table_with_joins,
113                alias,
114            } => (
115                self.plan_table_with_joins(*table_with_joins, planner_context)?,
116                alias,
117            ),
118            TableFactor::UNNEST {
119                alias,
120                array_exprs,
121                with_offset: false,
122                with_offset_alias: None,
123                with_ordinality,
124            } => {
125                if with_ordinality {
126                    return not_impl_err!("UNNEST with ordinality is not supported yet");
127                }
128
129                // Unnest table factor has empty input
130                let schema = DFSchema::empty();
131                let input = LogicalPlanBuilder::empty(true).build()?;
132                // Unnest table factor can have multiple arguments.
133                // We treat each argument as a separate unnest expression.
134                let unnest_exprs = array_exprs
135                    .into_iter()
136                    .map(|sql_expr| {
137                        let expr = self.sql_expr_to_logical_expr(
138                            sql_expr,
139                            &schema,
140                            planner_context,
141                        )?;
142                        Self::check_unnest_arg(&expr, &schema)?;
143                        Ok(Expr::Unnest(Unnest::new(expr)))
144                    })
145                    .collect::<Result<Vec<_>>>()?;
146                if unnest_exprs.is_empty() {
147                    return plan_err!("UNNEST must have at least one argument");
148                }
149                let logical_plan = self.try_process_unnest(input, unnest_exprs)?;
150                (logical_plan, alias)
151            }
152            TableFactor::UNNEST { .. } => {
153                return not_impl_err!(
154                    "UNNEST table factor with offset is not supported yet"
155                );
156            }
157            // @todo Support TableFactory::TableFunction?
158            _ => {
159                return not_impl_err!(
160                    "Unsupported ast node {relation:?} in create_relation"
161                );
162            }
163        };
164
165        let optimized_plan = optimize_subquery_sort(plan)?.data;
166        if let Some(alias) = alias {
167            self.apply_table_alias(optimized_plan, alias)
168        } else {
169            Ok(optimized_plan)
170        }
171    }
172
173    pub(crate) fn create_relation_subquery(
174        &self,
175        subquery: TableFactor,
176        planner_context: &mut PlannerContext,
177    ) -> Result<LogicalPlan> {
178        // At this point for a syntactically valid query the outer_from_schema is
179        // guaranteed to be set, so the `.unwrap()` call will never panic. This
180        // is the case because we only call this method for lateral table
181        // factors, and those can never be the first factor in a FROM list. This
182        // means we arrived here through the `for` loop in `plan_from_tables` or
183        // the `for` loop in `plan_table_with_joins`.
184        let old_from_schema = planner_context
185            .set_outer_from_schema(None)
186            .unwrap_or_else(|| Arc::new(DFSchema::empty()));
187        let new_query_schema = match planner_context.outer_query_schema() {
188            Some(old_query_schema) => {
189                let mut new_query_schema = old_from_schema.as_ref().clone();
190                new_query_schema.merge(old_query_schema);
191                Some(Arc::new(new_query_schema))
192            }
193            None => Some(Arc::clone(&old_from_schema)),
194        };
195        let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
196
197        let plan = self.create_relation(subquery, planner_context)?;
198        let outer_ref_columns = plan.all_out_ref_exprs();
199
200        planner_context.set_outer_query_schema(old_query_schema);
201        planner_context.set_outer_from_schema(Some(old_from_schema));
202
203        // We can omit the subquery wrapper if there are no columns
204        // referencing the outer scope.
205        if outer_ref_columns.is_empty() {
206            return Ok(plan);
207        }
208
209        match plan {
210            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
211                subquery_alias(
212                    LogicalPlan::Subquery(Subquery {
213                        subquery: input,
214                        outer_ref_columns,
215                        spans: Spans::new(),
216                    }),
217                    alias,
218                )
219            }
220            plan => Ok(LogicalPlan::Subquery(Subquery {
221                subquery: Arc::new(plan),
222                outer_ref_columns,
223                spans: Spans::new(),
224            })),
225        }
226    }
227}
228
229fn optimize_subquery_sort(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
230    // When initializing subqueries, we examine sort options since they might be unnecessary.
231    // They are only important if the subquery result is affected by the ORDER BY statement,
232    // which can happen when we have:
233    // 1. DISTINCT ON / ARRAY_AGG ... => Handled by an `Aggregate` and its requirements.
234    // 2. RANK / ROW_NUMBER ... => Handled by a `WindowAggr` and its requirements.
235    // 3. LIMIT => Handled by a `Sort`, so we need to search for it.
236    let mut has_limit = false;
237    let new_plan = plan.transform_down(|c| {
238        if let LogicalPlan::Limit(_) = c {
239            has_limit = true;
240            return Ok(Transformed::no(c));
241        }
242        match c {
243            LogicalPlan::Sort(s) => {
244                if !has_limit {
245                    has_limit = false;
246                    return Ok(Transformed::yes(s.input.as_ref().clone()));
247                }
248                Ok(Transformed::no(LogicalPlan::Sort(s)))
249            }
250            _ => Ok(Transformed::no(c)),
251        }
252    });
253    new_plan
254}