Skip to main content

datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::TableReference;
27use datafusion_common::config::SqlParserOptions;
28use datafusion_common::datatype::{DataTypeExt, FieldExt};
29use datafusion_common::error::add_possible_columns_to_diag;
30use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
31use datafusion_common::{
32    DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
33    plan_datafusion_err,
34};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{Expr, col};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43/// SQL parser options
44#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46    /// Whether to parse float as decimal.
47    pub parse_float_as_decimal: bool,
48    /// Whether to normalize identifiers.
49    pub enable_ident_normalization: bool,
50    /// Whether to support varchar with length.
51    pub support_varchar_with_length: bool,
52    /// Whether to normalize options value.
53    pub enable_options_value_normalization: bool,
54    /// Whether to collect spans
55    pub collect_spans: bool,
56    /// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
57    pub map_string_types_to_utf8view: bool,
58    /// Default null ordering for sorting expressions.
59    pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63    /// Creates a new `ParserOptions` instance with default values.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use datafusion_sql::planner::ParserOptions;
69    /// let opts = ParserOptions::new();
70    /// assert_eq!(opts.parse_float_as_decimal, false);
71    /// assert_eq!(opts.enable_ident_normalization, true);
72    /// ```
73    pub fn new() -> Self {
74        Self {
75            parse_float_as_decimal: false,
76            enable_ident_normalization: true,
77            support_varchar_with_length: true,
78            map_string_types_to_utf8view: true,
79            enable_options_value_normalization: false,
80            collect_spans: false,
81            // By default, `nulls_max` is used to follow Postgres's behavior.
82            // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
83            default_null_ordering: NullOrdering::NullsMax,
84        }
85    }
86
87    /// Sets the `parse_float_as_decimal` option.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use datafusion_sql::planner::ParserOptions;
93    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
94    /// assert_eq!(opts.parse_float_as_decimal, true);
95    /// ```
96    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97        self.parse_float_as_decimal = value;
98        self
99    }
100
101    /// Sets the `enable_ident_normalization` option.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use datafusion_sql::planner::ParserOptions;
107    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
108    /// assert_eq!(opts.enable_ident_normalization, false);
109    /// ```
110    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111        self.enable_ident_normalization = value;
112        self
113    }
114
115    /// Sets the `support_varchar_with_length` option.
116    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117        self.support_varchar_with_length = value;
118        self
119    }
120
121    /// Sets the `map_string_types_to_utf8view` option.
122    pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123        self.map_string_types_to_utf8view = value;
124        self
125    }
126
127    /// Sets the `enable_options_value_normalization` option.
128    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129        self.enable_options_value_normalization = value;
130        self
131    }
132
133    /// Sets the `collect_spans` option.
134    pub fn with_collect_spans(mut self, value: bool) -> Self {
135        self.collect_spans = value;
136        self
137    }
138
139    /// Sets the `default_null_ordering` option.
140    pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141        self.default_null_ordering = value;
142        self
143    }
144}
145
146impl Default for ParserOptions {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153    fn from(options: &SqlParserOptions) -> Self {
154        Self {
155            parse_float_as_decimal: options.parse_float_as_decimal,
156            enable_ident_normalization: options.enable_ident_normalization,
157            support_varchar_with_length: options.support_varchar_with_length,
158            map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159            enable_options_value_normalization: options
160                .enable_options_value_normalization,
161            collect_spans: options.collect_spans,
162            default_null_ordering: options.default_null_ordering.as_str().into(),
163        }
164    }
165}
166
167/// Represents the null ordering for sorting expressions.
168#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170    /// Nulls appear last in ascending order.
171    NullsMax,
172    /// Nulls appear first in descending order.
173    NullsMin,
174    /// Nulls appear first.
175    NullsFirst,
176    /// Nulls appear last.
177    NullsLast,
178}
179
180impl NullOrdering {
181    /// Evaluates the null ordering based on the given ascending flag.
182    ///
183    /// # Returns
184    /// * `true` if nulls should appear first.
185    /// * `false` if nulls should appear last.
186    pub fn nulls_first(&self, asc: bool) -> bool {
187        match self {
188            Self::NullsMax => !asc,
189            Self::NullsMin => asc,
190            Self::NullsFirst => true,
191            Self::NullsLast => false,
192        }
193    }
194}
195
196impl FromStr for NullOrdering {
197    type Err = DataFusionError;
198
199    fn from_str(s: &str) -> Result<Self> {
200        match s {
201            "nulls_max" => Ok(Self::NullsMax),
202            "nulls_min" => Ok(Self::NullsMin),
203            "nulls_first" => Ok(Self::NullsFirst),
204            "nulls_last" => Ok(Self::NullsLast),
205            _ => plan_err!(
206                "Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"
207            ),
208        }
209    }
210}
211
212impl From<&str> for NullOrdering {
213    fn from(s: &str) -> Self {
214        Self::from_str(s).unwrap_or(Self::NullsMax)
215    }
216}
217
218/// Ident Normalizer
219#[derive(Debug)]
220pub struct IdentNormalizer {
221    normalize: bool,
222}
223
224impl Default for IdentNormalizer {
225    fn default() -> Self {
226        Self { normalize: true }
227    }
228}
229
230impl IdentNormalizer {
231    pub fn new(normalize: bool) -> Self {
232        Self { normalize }
233    }
234
235    pub fn normalize(&self, ident: Ident) -> String {
236        if self.normalize {
237            crate::utils::normalize_ident(ident)
238        } else {
239            ident.value
240        }
241    }
242}
243
244/// Struct to store the states used by the Planner. The Planner will leverage the states
245/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
246/// Common Table Expression (CTE) provided with WITH clause and
247/// Parameter Data Types provided with PREPARE statement and the query schema of the
248/// outer query plan.
249///
250/// # Cloning
251///
252/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
253/// This helps resolve scoping issues of CTEs.
254/// By using cloning, a subquery can inherit CTEs from the outer query
255/// and can also define its own private CTEs without affecting the outer query.
256#[derive(Debug, Clone)]
257pub struct PlannerContext {
258    /// Data types for numbered parameters ($1, $2, etc), if supplied
259    /// in `PREPARE` statement
260    prepare_param_data_types: Arc<Vec<FieldRef>>,
261    /// Map of CTE name to logical plan of the WITH clause.
262    /// Use `Arc<LogicalPlan>` to allow cheap cloning
263    ctes: HashMap<String, Arc<LogicalPlan>>,
264
265    /// The queries schemas of outer query relations, used to resolve the outer referenced
266    /// columns in subquery (recursive aware)
267    outer_queries_schemas_stack: Vec<DFSchemaRef>,
268    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
269    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
270    outer_from_schema: Option<DFSchemaRef>,
271    /// The query schema defined by the table
272    create_table_schema: Option<DFSchemaRef>,
273}
274
275impl Default for PlannerContext {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281impl PlannerContext {
282    /// Create an empty PlannerContext
283    pub fn new() -> Self {
284        Self {
285            prepare_param_data_types: Arc::new(vec![]),
286            ctes: HashMap::new(),
287            outer_queries_schemas_stack: vec![],
288            outer_from_schema: None,
289            create_table_schema: None,
290        }
291    }
292
293    /// Update the PlannerContext with provided prepare_param_data_types
294    pub fn with_prepare_param_data_types(
295        mut self,
296        prepare_param_data_types: Vec<FieldRef>,
297    ) -> Self {
298        self.prepare_param_data_types = prepare_param_data_types.into();
299        self
300    }
301
302    /// Return the stack of outer relations' schemas, the outer most
303    /// relation are at the first entry
304    pub fn outer_queries_schemas(&self) -> &[DFSchemaRef] {
305        &self.outer_queries_schemas_stack
306    }
307
308    /// Return an iterator of the subquery relations' schemas, innermost
309    /// relation is returned first.
310    ///
311    /// This order corresponds to the order of resolution when looking up column
312    /// references in subqueries, which start from the innermost relation and
313    /// then look up the outer relations one by one until a match is found or no
314    /// more outer relation exist.
315    ///
316    /// NOTE this is *REVERSED* order of [`Self::outer_queries_schemas`]
317    ///
318    /// This is useful to resolve the column reference in the subquery by
319    /// looking up the outer query schemas one by one.
320    pub fn outer_schemas_iter(&self) -> impl Iterator<Item = &DFSchemaRef> {
321        self.outer_queries_schemas_stack.iter().rev()
322    }
323
324    /// Sets the outer query schema, returning the existing one, if
325    /// any
326    pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
327        self.outer_queries_schemas_stack.push(schema);
328    }
329
330    /// The schema of the adjacent outer relation
331    pub fn latest_outer_query_schema(&self) -> Option<&DFSchemaRef> {
332        self.outer_queries_schemas_stack.last()
333    }
334
335    /// Remove the schema of the adjacent outer relation
336    pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
337        self.outer_queries_schemas_stack.pop()
338    }
339
340    pub fn set_table_schema(
341        &mut self,
342        mut schema: Option<DFSchemaRef>,
343    ) -> Option<DFSchemaRef> {
344        std::mem::swap(&mut self.create_table_schema, &mut schema);
345        schema
346    }
347
348    pub fn table_schema(&self) -> Option<DFSchemaRef> {
349        self.create_table_schema.clone()
350    }
351
352    // Return a clone of the outer FROM schema
353    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
354        self.outer_from_schema.clone()
355    }
356
357    /// Sets the outer FROM schema, returning the existing one, if any
358    pub fn set_outer_from_schema(
359        &mut self,
360        mut schema: Option<DFSchemaRef>,
361    ) -> Option<DFSchemaRef> {
362        std::mem::swap(&mut self.outer_from_schema, &mut schema);
363        schema
364    }
365
366    /// Extends the FROM schema, returning the existing one, if any
367    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
368        match self.outer_from_schema.as_mut() {
369            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
370            None => self.outer_from_schema = Some(Arc::clone(schema)),
371        };
372        Ok(())
373    }
374
375    /// Return the types of parameters (`$1`, `$2`, etc) if known
376    pub fn prepare_param_data_types(&self) -> &[FieldRef] {
377        &self.prepare_param_data_types
378    }
379
380    /// Returns true if there is a Common Table Expression (CTE) /
381    /// Subquery for the specified name
382    pub fn contains_cte(&self, cte_name: &str) -> bool {
383        self.ctes.contains_key(cte_name)
384    }
385
386    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
387    /// Subquery for the specified name
388    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
389        let cte_name = cte_name.into();
390        self.ctes.insert(cte_name, Arc::new(plan));
391    }
392
393    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
394    /// specified name
395    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
396        self.ctes.get(cte_name).map(|cte| cte.as_ref())
397    }
398
399    /// Remove the plan of CTE / Subquery for the specified name
400    pub(super) fn remove_cte(&mut self, cte_name: &str) {
401        self.ctes.remove(cte_name);
402    }
403}
404
405/// SQL query planner and binder
406///
407/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
408///
409/// You can control the behavior of the planner by providing [`ParserOptions`].
410///
411/// It performs the following tasks:
412///
413/// 1. Name and type resolution (called "binding" in other systems). This
414///    phase looks up table and column names using the [`ContextProvider`].
415/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
416///
417/// It does not perform type coercion, or perform optimization, which are done
418/// by subsequent passes.
419///
420/// Key interfaces are:
421/// * [`Self::sql_statement_to_plan`]: Convert a statement
422///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
423/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
424pub struct SqlToRel<'a, S: ContextProvider> {
425    pub(crate) context_provider: &'a S,
426    pub(crate) options: ParserOptions,
427    pub(crate) ident_normalizer: IdentNormalizer,
428}
429
430impl<'a, S: ContextProvider> SqlToRel<'a, S> {
431    /// Create a new query planner.
432    ///
433    /// The query planner derives the parser options from the context provider.
434    pub fn new(context_provider: &'a S) -> Self {
435        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
436        Self::new_with_options(context_provider, parser_options)
437    }
438
439    /// Create a new query planner with the given parser options.
440    ///
441    /// The query planner ignores the parser options from the context provider
442    /// and uses the given parser options instead.
443    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
444        let ident_normalize = options.enable_ident_normalization;
445
446        SqlToRel {
447            context_provider,
448            options,
449            ident_normalizer: IdentNormalizer::new(ident_normalize),
450        }
451    }
452
453    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
454        let mut fields = Vec::with_capacity(columns.len());
455
456        for column in columns {
457            let data_type = self.convert_data_type_to_field(&column.data_type)?;
458            let not_nullable = column
459                .options
460                .iter()
461                .any(|x| x.option == ColumnOption::NotNull);
462            fields.push(
463                data_type
464                    .as_ref()
465                    .clone()
466                    .with_name(self.ident_normalizer.normalize(column.name))
467                    .with_nullable(!not_nullable),
468            );
469        }
470
471        Ok(Schema::new(fields))
472    }
473
474    /// Returns a vector of (column_name, default_expr) pairs
475    pub(super) fn build_column_defaults(
476        &self,
477        columns: &Vec<SQLColumnDef>,
478        planner_context: &mut PlannerContext,
479    ) -> Result<Vec<(String, Expr)>> {
480        let mut column_defaults = vec![];
481        // Default expressions are restricted, column references are not allowed
482        let empty_schema = DFSchema::empty();
483        let error_desc = |e: DataFusionError| match e {
484            DataFusionError::SchemaError(ref err, _)
485                if matches!(**err, SchemaError::FieldNotFound { .. }) =>
486            {
487                plan_datafusion_err!(
488                    "Column reference is not allowed in the DEFAULT expression : {}",
489                    e
490                )
491            }
492            _ => e,
493        };
494
495        for column in columns {
496            if let Some(default_sql_expr) =
497                column.options.iter().find_map(|o| match &o.option {
498                    ColumnOption::Default(expr) => Some(expr),
499                    _ => None,
500                })
501            {
502                let default_expr = self
503                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
504                    .map_err(error_desc)?;
505                column_defaults.push((
506                    self.ident_normalizer.normalize(column.name.clone()),
507                    default_expr,
508                ));
509            }
510        }
511        Ok(column_defaults)
512    }
513
514    /// Apply the given TableAlias to the input plan
515    pub(crate) fn apply_table_alias(
516        &self,
517        plan: LogicalPlan,
518        alias: TableAlias,
519    ) -> Result<LogicalPlan> {
520        let idents = alias.columns.into_iter().map(|c| c.name).collect();
521        let plan = self.apply_expr_alias(plan, idents)?;
522
523        LogicalPlanBuilder::from(plan)
524            .alias(TableReference::bare(
525                self.ident_normalizer.normalize(alias.name),
526            ))?
527            .build()
528    }
529
530    pub(crate) fn apply_expr_alias(
531        &self,
532        plan: LogicalPlan,
533        idents: Vec<Ident>,
534    ) -> Result<LogicalPlan> {
535        if idents.is_empty() {
536            Ok(plan)
537        } else if idents.len() != plan.schema().fields().len() {
538            plan_err!(
539                "Source table contains {} columns but only {} \
540                names given as column alias",
541                plan.schema().fields().len(),
542                idents.len()
543            )
544        } else {
545            let fields = plan.schema().fields().clone();
546            LogicalPlanBuilder::from(plan)
547                .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
548                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
549                }))?
550                .build()
551        }
552    }
553
554    /// Validate the schema provides all of the columns referenced in the expressions.
555    pub(crate) fn validate_schema_satisfies_exprs(
556        &self,
557        schema: &DFSchema,
558        exprs: &[Expr],
559    ) -> Result<()> {
560        find_column_exprs(exprs)
561            .iter()
562            .try_for_each(|col| match col {
563                Expr::Column(col) => match &col.relation {
564                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
565                    None => {
566                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
567                            Ok(())
568                        } else {
569                            Err(field_not_found(
570                                col.relation.clone(),
571                                col.name.as_str(),
572                                schema,
573                            ))
574                        }
575                    }
576                }
577                .map_err(|err: DataFusionError| match &err {
578                    DataFusionError::SchemaError(inner, _)
579                        if matches!(
580                            inner.as_ref(),
581                            SchemaError::FieldNotFound { .. }
582                        ) =>
583                    {
584                        let SchemaError::FieldNotFound {
585                            field,
586                            valid_fields,
587                        } = inner.as_ref()
588                        else {
589                            unreachable!()
590                        };
591                        let mut diagnostic = if let Some(relation) = &col.relation {
592                            Diagnostic::new_error(
593                                format!(
594                                    "column '{}' not found in '{}'",
595                                    &col.name, relation
596                                ),
597                                col.spans().first(),
598                            )
599                        } else {
600                            Diagnostic::new_error(
601                                format!("column '{}' not found", &col.name),
602                                col.spans().first(),
603                            )
604                        };
605                        add_possible_columns_to_diag(
606                            &mut diagnostic,
607                            field,
608                            valid_fields,
609                        );
610                        err.with_diagnostic(diagnostic)
611                    }
612                    _ => err,
613                }),
614                _ => internal_err!("Not a column"),
615            })
616    }
617
618    pub(crate) fn convert_data_type_to_field(
619        &self,
620        sql_type: &SQLDataType,
621    ) -> Result<FieldRef> {
622        // First check if any of the registered type_planner can handle this type
623        if let Some(type_planner) = self.context_provider.get_type_planner()
624            && let Some(data_type) = type_planner.plan_type(sql_type)?
625        {
626            return Ok(data_type.into_nullable_field_ref());
627        }
628
629        // If no type_planner can handle this type, use the default conversion
630        match sql_type {
631            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
632                // Arrays may be multi-dimensional.
633                Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
634            }
635            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
636                inner_sql_type,
637                maybe_array_size,
638            )) => {
639                let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
640                if let Some(array_size) = maybe_array_size {
641                    let array_size: i32 = (*array_size).try_into().map_err(|_| {
642                        plan_datafusion_err!(
643                            "Array size must be a positive 32 bit integer, got {array_size}"
644                        )
645                    })?;
646                    Ok(inner_field.into_fixed_size_list(array_size))
647                } else {
648                    Ok(inner_field.into_list())
649                }
650            }
651            SQLDataType::Array(ArrayElemTypeDef::None) => {
652                not_impl_err!("Arrays with unspecified type is not supported")
653            }
654            other => Ok(self
655                .convert_simple_data_type(other)?
656                .into_nullable_field_ref()),
657        }
658    }
659
660    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
661        match sql_type {
662            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
663            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
664            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
665            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
666                Ok(DataType::Int32)
667            }
668            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
669            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
670            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
671                Ok(DataType::UInt16)
672            }
673            SQLDataType::IntUnsigned(_)
674            | SQLDataType::IntegerUnsigned(_)
675            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
676            SQLDataType::Varchar(length) => {
677                match (length, self.options.support_varchar_with_length) {
678                    (Some(_), false) => plan_err!(
679                        "does not support Varchar with length, \
680                    please set `support_varchar_with_length` to be true"
681                    ),
682                    _ => {
683                        if self.options.map_string_types_to_utf8view {
684                            Ok(DataType::Utf8View)
685                        } else {
686                            Ok(DataType::Utf8)
687                        }
688                    }
689                }
690            }
691            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
692                Ok(DataType::UInt64)
693            }
694            SQLDataType::Float(_) => Ok(DataType::Float32),
695            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
696            SQLDataType::Double(ExactNumberInfo::None)
697            | SQLDataType::DoublePrecision
698            | SQLDataType::Float8 => Ok(DataType::Float64),
699            SQLDataType::Double(
700                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
701            ) => {
702                not_impl_err!(
703                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
704                )
705            }
706            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
707                if self.options.map_string_types_to_utf8view {
708                    Ok(DataType::Utf8View)
709                } else {
710                    Ok(DataType::Utf8)
711                }
712            }
713            SQLDataType::Timestamp(precision, tz_info)
714                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
715            {
716                let tz = if *tz_info == TimezoneInfo::Tz
717                    || *tz_info == TimezoneInfo::WithTimeZone
718                {
719                    // Timestamp With Time Zone
720                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
721                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
722                    self.context_provider.options().execution.time_zone.clone()
723                } else {
724                    // Timestamp Without Time zone
725                    None
726                };
727                let precision = match precision {
728                    Some(0) => TimeUnit::Second,
729                    Some(3) => TimeUnit::Millisecond,
730                    Some(6) => TimeUnit::Microsecond,
731                    None | Some(9) => TimeUnit::Nanosecond,
732                    _ => unreachable!(),
733                };
734                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
735            }
736            SQLDataType::Date => Ok(DataType::Date32),
737            SQLDataType::Time(None, tz_info) => {
738                if *tz_info == TimezoneInfo::None
739                    || *tz_info == TimezoneInfo::WithoutTimeZone
740                {
741                    Ok(DataType::Time64(TimeUnit::Nanosecond))
742                } else {
743                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
744                    not_impl_err!("Unsupported SQL type {sql_type}")
745                }
746            }
747            SQLDataType::Numeric(exact_number_info)
748            | SQLDataType::Decimal(exact_number_info) => {
749                let (precision, scale) = match *exact_number_info {
750                    ExactNumberInfo::None => (None, None),
751                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
752                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
753                        (Some(precision), Some(scale))
754                    }
755                };
756                make_decimal_type(precision, scale.map(|s| s as u64))
757            }
758            SQLDataType::Bytea => Ok(DataType::Binary),
759            SQLDataType::Interval { fields, precision } => {
760                if fields.is_some() || precision.is_some() {
761                    return not_impl_err!("Unsupported SQL type {sql_type}");
762                }
763                Ok(DataType::Interval(IntervalUnit::MonthDayNano))
764            }
765            SQLDataType::Struct(fields, _) => {
766                let fields = fields
767                    .iter()
768                    .enumerate()
769                    .map(|(idx, sql_struct_field)| {
770                        let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
771                        let field_name = match &sql_struct_field.field_name {
772                            Some(ident) => ident.clone(),
773                            None => Ident::new(format!("c{idx}")),
774                        };
775                        Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
776                    })
777                    .collect::<Result<Vec<_>>>()?;
778                Ok(DataType::Struct(Fields::from(fields)))
779            }
780            SQLDataType::Nvarchar(_)
781            | SQLDataType::JSON
782            | SQLDataType::Uuid
783            | SQLDataType::Binary(_)
784            | SQLDataType::Varbinary(_)
785            | SQLDataType::Blob(_)
786            | SQLDataType::Datetime(_)
787            | SQLDataType::Regclass
788            | SQLDataType::Custom(_, _)
789            | SQLDataType::Array(_)
790            | SQLDataType::Enum(_, _)
791            | SQLDataType::Set(_)
792            | SQLDataType::MediumInt(_)
793            | SQLDataType::MediumIntUnsigned(_)
794            | SQLDataType::Character(_)
795            | SQLDataType::CharacterVarying(_)
796            | SQLDataType::CharVarying(_)
797            | SQLDataType::CharacterLargeObject(_)
798            | SQLDataType::CharLargeObject(_)
799            | SQLDataType::Timestamp(_, _)
800            | SQLDataType::Time(Some(_), _)
801            | SQLDataType::Dec(_)
802            | SQLDataType::BigNumeric(_)
803            | SQLDataType::BigDecimal(_)
804            | SQLDataType::Clob(_)
805            | SQLDataType::Bytes(_)
806            | SQLDataType::Int64
807            | SQLDataType::Float64
808            | SQLDataType::JSONB
809            | SQLDataType::Unspecified
810            | SQLDataType::Int16
811            | SQLDataType::Int32
812            | SQLDataType::Int128
813            | SQLDataType::Int256
814            | SQLDataType::UInt8
815            | SQLDataType::UInt16
816            | SQLDataType::UInt32
817            | SQLDataType::UInt64
818            | SQLDataType::UInt128
819            | SQLDataType::UInt256
820            | SQLDataType::Float32
821            | SQLDataType::Date32
822            | SQLDataType::Datetime64(_, _)
823            | SQLDataType::FixedString(_)
824            | SQLDataType::Map(_, _)
825            | SQLDataType::Tuple(_)
826            | SQLDataType::Nested(_)
827            | SQLDataType::Union(_)
828            | SQLDataType::Nullable(_)
829            | SQLDataType::LowCardinality(_)
830            | SQLDataType::Trigger
831            | SQLDataType::TinyBlob
832            | SQLDataType::MediumBlob
833            | SQLDataType::LongBlob
834            | SQLDataType::TinyText
835            | SQLDataType::MediumText
836            | SQLDataType::LongText
837            | SQLDataType::Bit(_)
838            | SQLDataType::BitVarying(_)
839            | SQLDataType::Signed
840            | SQLDataType::SignedInteger
841            | SQLDataType::Unsigned
842            | SQLDataType::UnsignedInteger
843            | SQLDataType::AnyType
844            | SQLDataType::Table(_)
845            | SQLDataType::VarBit(_)
846            | SQLDataType::UTinyInt
847            | SQLDataType::USmallInt
848            | SQLDataType::HugeInt
849            | SQLDataType::UHugeInt
850            | SQLDataType::UBigInt
851            | SQLDataType::TimestampNtz{..}
852            | SQLDataType::NamedTable { .. }
853            | SQLDataType::TsVector
854            | SQLDataType::TsQuery
855            | SQLDataType::GeometricType(_)
856            | SQLDataType::DecimalUnsigned(_) // deprecated mysql type
857            | SQLDataType::FloatUnsigned(_) // deprecated mysql type
858            | SQLDataType::RealUnsigned // deprecated mysql type
859            | SQLDataType::DecUnsigned(_) // deprecated mysql type
860            | SQLDataType::DoubleUnsigned(_) // deprecated mysql type
861            | SQLDataType::DoublePrecisionUnsigned // deprecated mysql type
862            => {
863                not_impl_err!("Unsupported SQL type {sql_type}")
864            }
865        }
866    }
867
868    pub(crate) fn object_name_to_table_reference(
869        &self,
870        object_name: ObjectName,
871    ) -> Result<TableReference> {
872        object_name_to_table_reference(
873            object_name,
874            self.options.enable_ident_normalization,
875        )
876    }
877}
878
879/// Create a [`TableReference`] after normalizing the specified ObjectName
880///
881/// Examples
882/// ```text
883/// ['foo']          -> Bare { table: "foo" }
884/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
885/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
886/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
887/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
888/// ```
889pub fn object_name_to_table_reference(
890    object_name: ObjectName,
891    enable_normalization: bool,
892) -> Result<TableReference> {
893    // Use destructure to make it clear no fields on ObjectName are ignored
894    let ObjectName(object_name_parts) = object_name;
895    let idents = object_name_parts
896        .into_iter()
897        .map(|object_name_part| {
898            object_name_part.as_ident().cloned().ok_or_else(|| {
899                plan_datafusion_err!(
900                    "Expected identifier, but found: {:?}",
901                    object_name_part
902                )
903            })
904        })
905        .collect::<Result<Vec<_>>>()?;
906    idents_to_table_reference(idents, enable_normalization)
907}
908
909struct IdentTaker {
910    normalizer: IdentNormalizer,
911    idents: Vec<Ident>,
912}
913
914/// Take the next identifier from the back of idents, panic'ing if
915/// there are none left
916impl IdentTaker {
917    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
918        Self {
919            normalizer: IdentNormalizer::new(enable_normalization),
920            idents,
921        }
922    }
923
924    fn take(&mut self) -> String {
925        let ident = self.idents.pop().expect("no more identifiers");
926        self.normalizer.normalize(ident)
927    }
928
929    /// Returns the number of remaining identifiers
930    fn len(&self) -> usize {
931        self.idents.len()
932    }
933}
934
935// impl Display for a nicer error message
936impl std::fmt::Display for IdentTaker {
937    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
938        let mut first = true;
939        for ident in self.idents.iter() {
940            if !first {
941                write!(f, ".")?;
942            }
943            write!(f, "{ident}")?;
944            first = false;
945        }
946
947        Ok(())
948    }
949}
950
951/// Create a [`TableReference`] after normalizing the specified identifier
952pub(crate) fn idents_to_table_reference(
953    idents: Vec<Ident>,
954    enable_normalization: bool,
955) -> Result<TableReference> {
956    let mut taker = IdentTaker::new(idents, enable_normalization);
957
958    match taker.len() {
959        1 => {
960            let table = taker.take();
961            Ok(TableReference::bare(table))
962        }
963        2 => {
964            let table = taker.take();
965            let schema = taker.take();
966            Ok(TableReference::partial(schema, table))
967        }
968        3 => {
969            let table = taker.take();
970            let schema = taker.take();
971            let catalog = taker.take();
972            Ok(TableReference::full(catalog, schema, table))
973        }
974        _ => plan_err!(
975            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
976            taker,
977            taker.len()
978        ),
979    }
980}
981
982/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
983/// from the provided object identifiers (catalog, schema and table names).
984pub fn object_name_to_qualifier(
985    sql_table_name: &ObjectName,
986    enable_normalization: bool,
987) -> Result<String> {
988    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
989    let normalizer = IdentNormalizer::new(enable_normalization);
990    sql_table_name
991        .0
992        .iter()
993        .rev()
994        .zip(columns)
995        .map(|(object_name_part, column_name)| {
996            object_name_part
997                .as_ident()
998                .map(|ident| {
999                    format!(
1000                        r#"{} = '{}'"#,
1001                        column_name,
1002                        normalizer.normalize(ident.clone())
1003                    )
1004                })
1005                .ok_or_else(|| {
1006                    plan_datafusion_err!(
1007                        "Expected identifier, but found: {:?}",
1008                        object_name_part
1009                    )
1010                })
1011        })
1012        .collect::<Result<Vec<_>>>()
1013        .map(|parts| parts.join(" AND "))
1014}