Skip to main content

datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::TableReference;
27use datafusion_common::config::SqlParserOptions;
28use datafusion_common::datatype::{DataTypeExt, FieldExt};
29use datafusion_common::error::add_possible_columns_to_diag;
30use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
31use datafusion_common::{
32    DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
33    plan_datafusion_err,
34};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{Expr, col};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43/// SQL parser options
44#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46    /// Whether to parse float as decimal.
47    pub parse_float_as_decimal: bool,
48    /// Whether to normalize identifiers.
49    pub enable_ident_normalization: bool,
50    /// Whether to support varchar with length.
51    pub support_varchar_with_length: bool,
52    /// Whether to normalize options value.
53    pub enable_options_value_normalization: bool,
54    /// Whether to collect spans
55    pub collect_spans: bool,
56    /// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
57    pub map_string_types_to_utf8view: bool,
58    /// Default null ordering for sorting expressions.
59    pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63    /// Creates a new `ParserOptions` instance with default values.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use datafusion_sql::planner::ParserOptions;
69    /// let opts = ParserOptions::new();
70    /// assert_eq!(opts.parse_float_as_decimal, false);
71    /// assert_eq!(opts.enable_ident_normalization, true);
72    /// ```
73    pub fn new() -> Self {
74        Self {
75            parse_float_as_decimal: false,
76            enable_ident_normalization: true,
77            support_varchar_with_length: true,
78            map_string_types_to_utf8view: true,
79            enable_options_value_normalization: false,
80            collect_spans: false,
81            // By default, `nulls_max` is used to follow Postgres's behavior.
82            // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
83            default_null_ordering: NullOrdering::NullsMax,
84        }
85    }
86
87    /// Sets the `parse_float_as_decimal` option.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use datafusion_sql::planner::ParserOptions;
93    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
94    /// assert_eq!(opts.parse_float_as_decimal, true);
95    /// ```
96    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97        self.parse_float_as_decimal = value;
98        self
99    }
100
101    /// Sets the `enable_ident_normalization` option.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use datafusion_sql::planner::ParserOptions;
107    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
108    /// assert_eq!(opts.enable_ident_normalization, false);
109    /// ```
110    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111        self.enable_ident_normalization = value;
112        self
113    }
114
115    /// Sets the `support_varchar_with_length` option.
116    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117        self.support_varchar_with_length = value;
118        self
119    }
120
121    /// Sets the `map_string_types_to_utf8view` option.
122    pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123        self.map_string_types_to_utf8view = value;
124        self
125    }
126
127    /// Sets the `enable_options_value_normalization` option.
128    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129        self.enable_options_value_normalization = value;
130        self
131    }
132
133    /// Sets the `collect_spans` option.
134    pub fn with_collect_spans(mut self, value: bool) -> Self {
135        self.collect_spans = value;
136        self
137    }
138
139    /// Sets the `default_null_ordering` option.
140    pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141        self.default_null_ordering = value;
142        self
143    }
144}
145
146impl Default for ParserOptions {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153    fn from(options: &SqlParserOptions) -> Self {
154        Self {
155            parse_float_as_decimal: options.parse_float_as_decimal,
156            enable_ident_normalization: options.enable_ident_normalization,
157            support_varchar_with_length: options.support_varchar_with_length,
158            map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159            enable_options_value_normalization: options
160                .enable_options_value_normalization,
161            collect_spans: options.collect_spans,
162            default_null_ordering: options.default_null_ordering.as_str().into(),
163        }
164    }
165}
166
167/// Represents the null ordering for sorting expressions.
168#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170    /// Nulls appear last in ascending order.
171    NullsMax,
172    /// Nulls appear first in descending order.
173    NullsMin,
174    /// Nulls appear first.
175    NullsFirst,
176    /// Nulls appear last.
177    NullsLast,
178}
179
180impl NullOrdering {
181    /// Evaluates the null ordering based on the given ascending flag.
182    ///
183    /// # Returns
184    /// * `true` if nulls should appear first.
185    /// * `false` if nulls should appear last.
186    pub fn nulls_first(&self, asc: bool) -> bool {
187        match self {
188            Self::NullsMax => !asc,
189            Self::NullsMin => asc,
190            Self::NullsFirst => true,
191            Self::NullsLast => false,
192        }
193    }
194}
195
196impl FromStr for NullOrdering {
197    type Err = DataFusionError;
198
199    fn from_str(s: &str) -> Result<Self> {
200        match s {
201            "nulls_max" => Ok(Self::NullsMax),
202            "nulls_min" => Ok(Self::NullsMin),
203            "nulls_first" => Ok(Self::NullsFirst),
204            "nulls_last" => Ok(Self::NullsLast),
205            _ => plan_err!(
206                "Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"
207            ),
208        }
209    }
210}
211
212impl From<&str> for NullOrdering {
213    fn from(s: &str) -> Self {
214        Self::from_str(s).unwrap_or(Self::NullsMax)
215    }
216}
217
218/// Ident Normalizer
219#[derive(Debug)]
220pub struct IdentNormalizer {
221    normalize: bool,
222}
223
224impl Default for IdentNormalizer {
225    fn default() -> Self {
226        Self { normalize: true }
227    }
228}
229
230impl IdentNormalizer {
231    pub fn new(normalize: bool) -> Self {
232        Self { normalize }
233    }
234
235    pub fn normalize(&self, ident: Ident) -> String {
236        if self.normalize {
237            crate::utils::normalize_ident(ident)
238        } else {
239            ident.value
240        }
241    }
242}
243
244/// Struct to store the states used by the Planner. The Planner will leverage the states
245/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
246/// Common Table Expression (CTE) provided with WITH clause and
247/// Parameter Data Types provided with PREPARE statement and the query schema of the
248/// outer query plan.
249///
250/// # Cloning
251///
252/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
253/// This helps resolve scoping issues of CTEs.
254/// By using cloning, a subquery can inherit CTEs from the outer query
255/// and can also define its own private CTEs without affecting the outer query.
256#[derive(Debug, Clone)]
257pub struct PlannerContext {
258    /// Data types for numbered parameters ($1, $2, etc), if supplied
259    /// in `PREPARE` statement
260    prepare_param_data_types: Arc<Vec<Option<FieldRef>>>,
261    /// Map of CTE name to logical plan of the WITH clause.
262    /// Use `Arc<LogicalPlan>` to allow cheap cloning
263    ctes: HashMap<String, Arc<LogicalPlan>>,
264
265    /// The queries schemas of outer query relations, used to resolve the outer referenced
266    /// columns in subquery (recursive aware)
267    outer_queries_schemas_stack: Vec<DFSchemaRef>,
268    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
269    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
270    outer_from_schema: Option<DFSchemaRef>,
271    /// The query schema defined by the table
272    create_table_schema: Option<DFSchemaRef>,
273    /// When planning non-first queries in a set expression
274    /// (UNION/INTERSECT/EXCEPT), holds the schema of the left-most query.
275    /// Used to alias duplicate expressions to match the left side's field names.
276    set_expr_left_schema: Option<DFSchemaRef>,
277    /// The parameters of all lambdas seen so far
278    lambda_parameters: HashMap<String, FieldRef>,
279}
280
281impl Default for PlannerContext {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287impl PlannerContext {
288    /// Create an empty PlannerContext
289    pub fn new() -> Self {
290        Self {
291            prepare_param_data_types: Arc::new(vec![]),
292            ctes: HashMap::new(),
293            outer_queries_schemas_stack: vec![],
294            outer_from_schema: None,
295            create_table_schema: None,
296            set_expr_left_schema: None,
297            lambda_parameters: HashMap::new(),
298        }
299    }
300
301    /// Update the PlannerContext with provided prepare_param_data_types
302    pub fn with_prepare_param_data_types(
303        mut self,
304        prepare_param_data_types: Vec<Option<FieldRef>>,
305    ) -> Self {
306        self.prepare_param_data_types = prepare_param_data_types.into();
307        self
308    }
309
310    /// Return the stack of outer relations' schemas, the outer most
311    /// relation are at the first entry
312    pub fn outer_queries_schemas(&self) -> &[DFSchemaRef] {
313        &self.outer_queries_schemas_stack
314    }
315
316    /// Return an iterator of the subquery relations' schemas, innermost
317    /// relation is returned first.
318    ///
319    /// This order corresponds to the order of resolution when looking up column
320    /// references in subqueries, which start from the innermost relation and
321    /// then look up the outer relations one by one until a match is found or no
322    /// more outer relation exist.
323    ///
324    /// NOTE this is *REVERSED* order of [`Self::outer_queries_schemas`]
325    ///
326    /// This is useful to resolve the column reference in the subquery by
327    /// looking up the outer query schemas one by one.
328    pub fn outer_schemas_iter(&self) -> impl Iterator<Item = &DFSchemaRef> {
329        self.outer_queries_schemas_stack.iter().rev()
330    }
331
332    /// Sets the outer query schema, returning the existing one, if
333    /// any
334    pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
335        self.outer_queries_schemas_stack.push(schema);
336    }
337
338    /// The schema of the adjacent outer relation
339    pub fn latest_outer_query_schema(&self) -> Option<&DFSchemaRef> {
340        self.outer_queries_schemas_stack.last()
341    }
342
343    /// Remove the schema of the adjacent outer relation
344    pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
345        self.outer_queries_schemas_stack.pop()
346    }
347
348    pub fn set_table_schema(
349        &mut self,
350        mut schema: Option<DFSchemaRef>,
351    ) -> Option<DFSchemaRef> {
352        std::mem::swap(&mut self.create_table_schema, &mut schema);
353        schema
354    }
355
356    pub fn table_schema(&self) -> Option<DFSchemaRef> {
357        self.create_table_schema.clone()
358    }
359
360    // Return a clone of the outer FROM schema
361    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
362        self.outer_from_schema.clone()
363    }
364
365    /// Sets the outer FROM schema, returning the existing one, if any
366    pub fn set_outer_from_schema(
367        &mut self,
368        mut schema: Option<DFSchemaRef>,
369    ) -> Option<DFSchemaRef> {
370        std::mem::swap(&mut self.outer_from_schema, &mut schema);
371        schema
372    }
373
374    /// Extends the FROM schema, returning the existing one, if any
375    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
376        match self.outer_from_schema.as_mut() {
377            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
378            None => self.outer_from_schema = Some(Arc::clone(schema)),
379        };
380        Ok(())
381    }
382
383    /// Return the types of parameters (`$1`, `$2`, etc) if known
384    pub fn prepare_param_data_types(&self) -> &[Option<FieldRef>] {
385        &self.prepare_param_data_types
386    }
387
388    /// Returns true if there is a Common Table Expression (CTE) /
389    /// Subquery for the specified name
390    pub fn contains_cte(&self, cte_name: &str) -> bool {
391        self.ctes.contains_key(cte_name)
392    }
393
394    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
395    /// Subquery for the specified name
396    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
397        let cte_name = cte_name.into();
398        self.ctes.insert(cte_name, Arc::new(plan));
399    }
400
401    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
402    /// specified name
403    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
404        self.ctes.get(cte_name).map(|cte| cte.as_ref())
405    }
406
407    pub fn lambda_parameters(&self) -> &HashMap<String, FieldRef> {
408        &self.lambda_parameters
409    }
410
411    pub fn with_lambda_parameters(
412        mut self,
413        parameters: impl IntoIterator<Item = FieldRef>,
414    ) -> Self {
415        self.lambda_parameters
416            .extend(parameters.into_iter().map(|f| (f.name().clone(), f)));
417
418        self
419    }
420
421    /// Remove the plan of CTE / Subquery for the specified name
422    pub(super) fn remove_cte(&mut self, cte_name: &str) {
423        self.ctes.remove(cte_name);
424    }
425
426    /// Sets the left-most set expression schema, returning the previous value
427    pub(super) fn set_set_expr_left_schema(
428        &mut self,
429        schema: Option<DFSchemaRef>,
430    ) -> Option<DFSchemaRef> {
431        std::mem::replace(&mut self.set_expr_left_schema, schema)
432    }
433}
434
435/// SQL query planner and binder
436///
437/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
438///
439/// You can control the behavior of the planner by providing [`ParserOptions`].
440///
441/// It performs the following tasks:
442///
443/// 1. Name and type resolution (called "binding" in other systems). This
444///    phase looks up table and column names using the [`ContextProvider`].
445/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
446///
447/// It does not perform type coercion, or perform optimization, which are done
448/// by subsequent passes.
449///
450/// Key interfaces are:
451/// * [`Self::sql_statement_to_plan`]: Convert a statement
452///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
453/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
454pub struct SqlToRel<'a, S: ContextProvider> {
455    pub(crate) context_provider: &'a S,
456    pub(crate) options: ParserOptions,
457    pub(crate) ident_normalizer: IdentNormalizer,
458}
459
460impl<'a, S: ContextProvider> SqlToRel<'a, S> {
461    /// Create a new query planner.
462    ///
463    /// The query planner derives the parser options from the context provider.
464    pub fn new(context_provider: &'a S) -> Self {
465        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
466        Self::new_with_options(context_provider, parser_options)
467    }
468
469    /// Create a new query planner with the given parser options.
470    ///
471    /// The query planner ignores the parser options from the context provider
472    /// and uses the given parser options instead.
473    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
474        let ident_normalize = options.enable_ident_normalization;
475
476        SqlToRel {
477            context_provider,
478            options,
479            ident_normalizer: IdentNormalizer::new(ident_normalize),
480        }
481    }
482
483    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
484        let mut fields = Vec::with_capacity(columns.len());
485
486        for column in columns {
487            let data_type = self.convert_data_type_to_field(&column.data_type)?;
488            let not_nullable = column
489                .options
490                .iter()
491                .any(|x| x.option == ColumnOption::NotNull);
492            fields.push(
493                data_type
494                    .as_ref()
495                    .clone()
496                    .with_name(self.ident_normalizer.normalize(column.name))
497                    .with_nullable(!not_nullable),
498            );
499        }
500
501        Ok(Schema::new(fields))
502    }
503
504    /// Returns a vector of (column_name, default_expr) pairs
505    pub(super) fn build_column_defaults(
506        &self,
507        columns: &Vec<SQLColumnDef>,
508        planner_context: &mut PlannerContext,
509    ) -> Result<Vec<(String, Expr)>> {
510        let mut column_defaults = vec![];
511        // Default expressions are restricted, column references are not allowed
512        let empty_schema = DFSchema::empty();
513        let error_desc = |e: DataFusionError| match e {
514            DataFusionError::SchemaError(ref err, _)
515                if matches!(**err, SchemaError::FieldNotFound { .. }) =>
516            {
517                plan_datafusion_err!(
518                    "Column reference is not allowed in the DEFAULT expression : {}",
519                    e
520                )
521            }
522            _ => e,
523        };
524
525        for column in columns {
526            if let Some(default_sql_expr) =
527                column.options.iter().find_map(|o| match &o.option {
528                    ColumnOption::Default(expr) => Some(expr),
529                    _ => None,
530                })
531            {
532                let default_expr = self
533                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
534                    .map_err(error_desc)?;
535                column_defaults.push((
536                    self.ident_normalizer.normalize(column.name.clone()),
537                    default_expr,
538                ));
539            }
540        }
541        Ok(column_defaults)
542    }
543
544    /// Apply the given TableAlias to the input plan
545    pub(crate) fn apply_table_alias(
546        &self,
547        plan: LogicalPlan,
548        alias: TableAlias,
549    ) -> Result<LogicalPlan> {
550        let idents = alias.columns.into_iter().map(|c| c.name).collect();
551        let plan = self.apply_expr_alias(plan, idents)?;
552
553        LogicalPlanBuilder::from(plan)
554            .alias(TableReference::bare(
555                self.ident_normalizer.normalize(alias.name),
556            ))?
557            .build()
558    }
559
560    pub(crate) fn apply_expr_alias(
561        &self,
562        plan: LogicalPlan,
563        idents: Vec<Ident>,
564    ) -> Result<LogicalPlan> {
565        if idents.is_empty() {
566            Ok(plan)
567        } else if idents.len() != plan.schema().fields().len() {
568            plan_err!(
569                "Source table contains {} columns but only {} \
570                names given as column alias",
571                plan.schema().fields().len(),
572                idents.len()
573            )
574        } else {
575            let fields = plan.schema().fields().clone();
576            LogicalPlanBuilder::from(plan)
577                .project(fields.iter().zip(idents).map(|(field, ident)| {
578                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
579                }))?
580                .build()
581        }
582    }
583
584    /// Validate the schema provides all of the columns referenced in the expressions.
585    pub(crate) fn validate_schema_satisfies_exprs(
586        &self,
587        schema: &DFSchema,
588        exprs: &[Expr],
589    ) -> Result<()> {
590        find_column_exprs(exprs)
591            .iter()
592            .try_for_each(|col| match col {
593                Expr::Column(col) => match &col.relation {
594                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
595                    None => {
596                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
597                            Ok(())
598                        } else {
599                            Err(field_not_found(
600                                col.relation.clone(),
601                                col.name.as_str(),
602                                schema,
603                            ))
604                        }
605                    }
606                }
607                .map_err(|err: DataFusionError| match &err {
608                    DataFusionError::SchemaError(inner, _)
609                        if matches!(
610                            inner.as_ref(),
611                            SchemaError::FieldNotFound { .. }
612                        ) =>
613                    {
614                        let SchemaError::FieldNotFound {
615                            field,
616                            valid_fields,
617                        } = inner.as_ref()
618                        else {
619                            unreachable!()
620                        };
621                        let mut diagnostic = if let Some(relation) = &col.relation {
622                            Diagnostic::new_error(
623                                format!(
624                                    "column '{}' not found in '{}'",
625                                    &col.name, relation
626                                ),
627                                col.spans().first(),
628                            )
629                        } else {
630                            Diagnostic::new_error(
631                                format!("column '{}' not found", &col.name),
632                                col.spans().first(),
633                            )
634                        };
635                        add_possible_columns_to_diag(
636                            &mut diagnostic,
637                            field,
638                            valid_fields,
639                        );
640                        err.with_diagnostic(diagnostic)
641                    }
642                    _ => err,
643                }),
644                _ => internal_err!("Not a column"),
645            })
646    }
647
648    pub(crate) fn convert_data_type_to_field(
649        &self,
650        sql_type: &SQLDataType,
651    ) -> Result<FieldRef> {
652        // First check if any of the registered type_planner can handle this type
653        if let Some(type_planner) = self.context_provider.get_type_planner()
654            && let Some(data_type) = type_planner.plan_type_field(sql_type)?
655        {
656            return Ok(data_type);
657        }
658
659        // If no type_planner can handle this type, use the default conversion
660        match sql_type {
661            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
662                // Arrays may be multi-dimensional.
663                Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
664            }
665            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
666                inner_sql_type,
667                maybe_array_size,
668            )) => {
669                let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
670                if let Some(array_size) = maybe_array_size {
671                    let array_size: i32 = (*array_size).try_into().map_err(|_| {
672                        plan_datafusion_err!(
673                            "Array size must be a positive 32 bit integer, got {array_size}"
674                        )
675                    })?;
676                    Ok(inner_field.into_fixed_size_list(array_size))
677                } else {
678                    Ok(inner_field.into_list())
679                }
680            }
681            SQLDataType::Array(ArrayElemTypeDef::None) => {
682                not_impl_err!("Arrays with unspecified type is not supported")
683            }
684            other => Ok(self
685                .convert_simple_data_type(other)?
686                .into_nullable_field_ref()),
687        }
688    }
689
690    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
691        match sql_type {
692            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
693            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
694            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
695            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
696                Ok(DataType::Int32)
697            }
698            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
699            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
700            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
701                Ok(DataType::UInt16)
702            }
703            SQLDataType::IntUnsigned(_)
704            | SQLDataType::IntegerUnsigned(_)
705            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
706            SQLDataType::Varchar(length) => {
707                match (length, self.options.support_varchar_with_length) {
708                    (Some(_), false) => plan_err!(
709                        "does not support Varchar with length, \
710                    please set `support_varchar_with_length` to be true"
711                    ),
712                    _ => {
713                        if self.options.map_string_types_to_utf8view {
714                            Ok(DataType::Utf8View)
715                        } else {
716                            Ok(DataType::Utf8)
717                        }
718                    }
719                }
720            }
721            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
722                Ok(DataType::UInt64)
723            }
724            SQLDataType::Float(_) => Ok(DataType::Float32),
725            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
726            SQLDataType::Double(ExactNumberInfo::None)
727            | SQLDataType::DoublePrecision
728            | SQLDataType::Float8 => Ok(DataType::Float64),
729            SQLDataType::Double(
730                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
731            ) => {
732                not_impl_err!(
733                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
734                )
735            }
736            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
737                if self.options.map_string_types_to_utf8view {
738                    Ok(DataType::Utf8View)
739                } else {
740                    Ok(DataType::Utf8)
741                }
742            }
743            SQLDataType::Timestamp(precision, tz_info)
744                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
745            {
746                let tz = if *tz_info == TimezoneInfo::Tz
747                    || *tz_info == TimezoneInfo::WithTimeZone
748                {
749                    // Timestamp With Time Zone
750                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
751                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
752                    self.context_provider.options().execution.time_zone.clone()
753                } else {
754                    // Timestamp Without Time zone
755                    None
756                };
757                let precision = match precision {
758                    Some(0) => TimeUnit::Second,
759                    Some(3) => TimeUnit::Millisecond,
760                    Some(6) => TimeUnit::Microsecond,
761                    None | Some(9) => TimeUnit::Nanosecond,
762                    _ => unreachable!(),
763                };
764                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
765            }
766            SQLDataType::Date => Ok(DataType::Date32),
767            SQLDataType::Time(None, tz_info) => {
768                if *tz_info == TimezoneInfo::None
769                    || *tz_info == TimezoneInfo::WithoutTimeZone
770                {
771                    Ok(DataType::Time64(TimeUnit::Nanosecond))
772                } else {
773                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
774                    not_impl_err!("Unsupported SQL type {sql_type}")
775                }
776            }
777            SQLDataType::Numeric(exact_number_info)
778            | SQLDataType::Decimal(exact_number_info) => {
779                let (precision, scale) = match *exact_number_info {
780                    ExactNumberInfo::None => (None, None),
781                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
782                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
783                        (Some(precision), Some(scale))
784                    }
785                };
786                make_decimal_type(precision, scale.map(|s| s as u64))
787            }
788            SQLDataType::Bytea => Ok(DataType::Binary),
789            SQLDataType::Interval { fields, precision } => {
790                if fields.is_some() || precision.is_some() {
791                    return not_impl_err!("Unsupported SQL type {sql_type}");
792                }
793                Ok(DataType::Interval(IntervalUnit::MonthDayNano))
794            }
795            SQLDataType::Struct(fields, _) => {
796                let fields = fields
797                    .iter()
798                    .enumerate()
799                    .map(|(idx, sql_struct_field)| {
800                        let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
801                        let field_name = match &sql_struct_field.field_name {
802                            Some(ident) => ident.clone(),
803                            None => Ident::new(format!("c{idx}")),
804                        };
805                        Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
806                    })
807                    .collect::<Result<Vec<_>>>()?;
808                Ok(DataType::Struct(Fields::from(fields)))
809            }
810            SQLDataType::Nvarchar(_)
811            | SQLDataType::JSON
812            | SQLDataType::Uuid
813            | SQLDataType::Binary(_)
814            | SQLDataType::Varbinary(_)
815            | SQLDataType::Blob(_)
816            | SQLDataType::Datetime(_)
817            | SQLDataType::Regclass
818            | SQLDataType::Custom(_, _)
819            | SQLDataType::Array(_)
820            | SQLDataType::Enum(_, _)
821            | SQLDataType::Set(_)
822            | SQLDataType::MediumInt(_)
823            | SQLDataType::MediumIntUnsigned(_)
824            | SQLDataType::Character(_)
825            | SQLDataType::CharacterVarying(_)
826            | SQLDataType::CharVarying(_)
827            | SQLDataType::CharacterLargeObject(_)
828            | SQLDataType::CharLargeObject(_)
829            | SQLDataType::Timestamp(_, _)
830            | SQLDataType::Time(Some(_), _)
831            | SQLDataType::Dec(_)
832            | SQLDataType::BigNumeric(_)
833            | SQLDataType::BigDecimal(_)
834            | SQLDataType::Clob(_)
835            | SQLDataType::Bytes(_)
836            | SQLDataType::Int64
837            | SQLDataType::Float64
838            | SQLDataType::JSONB
839            | SQLDataType::Unspecified
840            | SQLDataType::Int16
841            | SQLDataType::Int32
842            | SQLDataType::Int128
843            | SQLDataType::Int256
844            | SQLDataType::UInt8
845            | SQLDataType::UInt16
846            | SQLDataType::UInt32
847            | SQLDataType::UInt64
848            | SQLDataType::UInt128
849            | SQLDataType::UInt256
850            | SQLDataType::Float32
851            | SQLDataType::Date32
852            | SQLDataType::Datetime64(_, _)
853            | SQLDataType::FixedString(_)
854            | SQLDataType::Map(_, _)
855            | SQLDataType::Tuple(_)
856            | SQLDataType::Nested(_)
857            | SQLDataType::Union(_)
858            | SQLDataType::Nullable(_)
859            | SQLDataType::LowCardinality(_)
860            | SQLDataType::Trigger
861            | SQLDataType::TinyBlob
862            | SQLDataType::MediumBlob
863            | SQLDataType::LongBlob
864            | SQLDataType::TinyText
865            | SQLDataType::MediumText
866            | SQLDataType::LongText
867            | SQLDataType::Bit(_)
868            | SQLDataType::BitVarying(_)
869            | SQLDataType::Signed
870            | SQLDataType::SignedInteger
871            | SQLDataType::Unsigned
872            | SQLDataType::UnsignedInteger
873            | SQLDataType::AnyType
874            | SQLDataType::Table(_)
875            | SQLDataType::VarBit(_)
876            | SQLDataType::UTinyInt
877            | SQLDataType::USmallInt
878            | SQLDataType::HugeInt
879            | SQLDataType::UHugeInt
880            | SQLDataType::UBigInt
881            | SQLDataType::TimestampNtz{..}
882            | SQLDataType::NamedTable { .. }
883            | SQLDataType::TsVector
884            | SQLDataType::TsQuery
885            | SQLDataType::GeometricType(_)
886            | SQLDataType::DecimalUnsigned(_) // deprecated mysql type
887            | SQLDataType::FloatUnsigned(_) // deprecated mysql type
888            | SQLDataType::RealUnsigned // deprecated mysql type
889            | SQLDataType::DecUnsigned(_) // deprecated mysql type
890            | SQLDataType::DoubleUnsigned(_) // deprecated mysql type
891            | SQLDataType::DoublePrecisionUnsigned // deprecated mysql type
892            => {
893                not_impl_err!("Unsupported SQL type {sql_type}")
894            }
895        }
896    }
897
898    pub(crate) fn object_name_to_table_reference(
899        &self,
900        object_name: ObjectName,
901    ) -> Result<TableReference> {
902        object_name_to_table_reference(
903            object_name,
904            self.options.enable_ident_normalization,
905        )
906    }
907}
908
909/// Create a [`TableReference`] after normalizing the specified ObjectName
910///
911/// Examples
912/// ```text
913/// ['foo']          -> Bare { table: "foo" }
914/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
915/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
916/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
917/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
918/// ```
919pub fn object_name_to_table_reference(
920    object_name: ObjectName,
921    enable_normalization: bool,
922) -> Result<TableReference> {
923    // Use destructure to make it clear no fields on ObjectName are ignored
924    let ObjectName(object_name_parts) = object_name;
925    let idents = object_name_parts
926        .into_iter()
927        .map(|object_name_part| {
928            object_name_part.as_ident().cloned().ok_or_else(|| {
929                plan_datafusion_err!(
930                    "Expected identifier, but found: {:?}",
931                    object_name_part
932                )
933            })
934        })
935        .collect::<Result<Vec<_>>>()?;
936    idents_to_table_reference(idents, enable_normalization)
937}
938
939struct IdentTaker {
940    normalizer: IdentNormalizer,
941    idents: Vec<Ident>,
942}
943
944/// Take the next identifier from the back of idents, panic'ing if
945/// there are none left
946impl IdentTaker {
947    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
948        Self {
949            normalizer: IdentNormalizer::new(enable_normalization),
950            idents,
951        }
952    }
953
954    fn take(&mut self) -> String {
955        let ident = self.idents.pop().expect("no more identifiers");
956        self.normalizer.normalize(ident)
957    }
958
959    /// Returns the number of remaining identifiers
960    fn len(&self) -> usize {
961        self.idents.len()
962    }
963}
964
965// impl Display for a nicer error message
966impl std::fmt::Display for IdentTaker {
967    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
968        let mut first = true;
969        for ident in self.idents.iter() {
970            if !first {
971                write!(f, ".")?;
972            }
973            write!(f, "{ident}")?;
974            first = false;
975        }
976
977        Ok(())
978    }
979}
980
981/// Create a [`TableReference`] after normalizing the specified identifier
982pub(crate) fn idents_to_table_reference(
983    idents: Vec<Ident>,
984    enable_normalization: bool,
985) -> Result<TableReference> {
986    let mut taker = IdentTaker::new(idents, enable_normalization);
987
988    match taker.len() {
989        1 => {
990            let table = taker.take();
991            Ok(TableReference::bare(table))
992        }
993        2 => {
994            let table = taker.take();
995            let schema = taker.take();
996            Ok(TableReference::partial(schema, table))
997        }
998        3 => {
999            let table = taker.take();
1000            let schema = taker.take();
1001            let catalog = taker.take();
1002            Ok(TableReference::full(catalog, schema, table))
1003        }
1004        _ => plan_err!(
1005            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
1006            taker,
1007            taker.len()
1008        ),
1009    }
1010}
1011
1012/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
1013/// from the provided object identifiers (catalog, schema and table names).
1014pub fn object_name_to_qualifier(
1015    sql_table_name: &ObjectName,
1016    enable_normalization: bool,
1017) -> Result<String> {
1018    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
1019    let normalizer = IdentNormalizer::new(enable_normalization);
1020    sql_table_name
1021        .0
1022        .iter()
1023        .rev()
1024        .zip(columns)
1025        .map(|(object_name_part, column_name)| {
1026            object_name_part
1027                .as_ident()
1028                .map(|ident| {
1029                    format!(
1030                        r#"{} = '{}'"#,
1031                        column_name,
1032                        normalizer.normalize(ident.clone())
1033                    )
1034                })
1035                .ok_or_else(|| {
1036                    plan_datafusion_err!(
1037                        "Expected identifier, but found: {:?}",
1038                        object_name_part
1039                    )
1040                })
1041        })
1042        .collect::<Result<Vec<_>>>()
1043        .map(|parts| parts.join(" AND "))
1044}