datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::TableReference;
27use datafusion_common::config::SqlParserOptions;
28use datafusion_common::datatype::{DataTypeExt, FieldExt};
29use datafusion_common::error::add_possible_columns_to_diag;
30use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
31use datafusion_common::{
32    DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
33    plan_datafusion_err,
34};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{Expr, col};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43/// SQL parser options
44#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46    /// Whether to parse float as decimal.
47    pub parse_float_as_decimal: bool,
48    /// Whether to normalize identifiers.
49    pub enable_ident_normalization: bool,
50    /// Whether to support varchar with length.
51    pub support_varchar_with_length: bool,
52    /// Whether to normalize options value.
53    pub enable_options_value_normalization: bool,
54    /// Whether to collect spans
55    pub collect_spans: bool,
56    /// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
57    pub map_string_types_to_utf8view: bool,
58    /// Default null ordering for sorting expressions.
59    pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63    /// Creates a new `ParserOptions` instance with default values.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use datafusion_sql::planner::ParserOptions;
69    /// let opts = ParserOptions::new();
70    /// assert_eq!(opts.parse_float_as_decimal, false);
71    /// assert_eq!(opts.enable_ident_normalization, true);
72    /// ```
73    pub fn new() -> Self {
74        Self {
75            parse_float_as_decimal: false,
76            enable_ident_normalization: true,
77            support_varchar_with_length: true,
78            map_string_types_to_utf8view: true,
79            enable_options_value_normalization: false,
80            collect_spans: false,
81            // By default, `nulls_max` is used to follow Postgres's behavior.
82            // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
83            default_null_ordering: NullOrdering::NullsMax,
84        }
85    }
86
87    /// Sets the `parse_float_as_decimal` option.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use datafusion_sql::planner::ParserOptions;
93    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
94    /// assert_eq!(opts.parse_float_as_decimal, true);
95    /// ```
96    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97        self.parse_float_as_decimal = value;
98        self
99    }
100
101    /// Sets the `enable_ident_normalization` option.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use datafusion_sql::planner::ParserOptions;
107    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
108    /// assert_eq!(opts.enable_ident_normalization, false);
109    /// ```
110    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111        self.enable_ident_normalization = value;
112        self
113    }
114
115    /// Sets the `support_varchar_with_length` option.
116    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117        self.support_varchar_with_length = value;
118        self
119    }
120
121    /// Sets the `map_string_types_to_utf8view` option.
122    pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123        self.map_string_types_to_utf8view = value;
124        self
125    }
126
127    /// Sets the `enable_options_value_normalization` option.
128    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129        self.enable_options_value_normalization = value;
130        self
131    }
132
133    /// Sets the `collect_spans` option.
134    pub fn with_collect_spans(mut self, value: bool) -> Self {
135        self.collect_spans = value;
136        self
137    }
138
139    /// Sets the `default_null_ordering` option.
140    pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141        self.default_null_ordering = value;
142        self
143    }
144}
145
146impl Default for ParserOptions {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153    fn from(options: &SqlParserOptions) -> Self {
154        Self {
155            parse_float_as_decimal: options.parse_float_as_decimal,
156            enable_ident_normalization: options.enable_ident_normalization,
157            support_varchar_with_length: options.support_varchar_with_length,
158            map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159            enable_options_value_normalization: options
160                .enable_options_value_normalization,
161            collect_spans: options.collect_spans,
162            default_null_ordering: options.default_null_ordering.as_str().into(),
163        }
164    }
165}
166
167/// Represents the null ordering for sorting expressions.
168#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170    /// Nulls appear last in ascending order.
171    NullsMax,
172    /// Nulls appear first in descending order.
173    NullsMin,
174    /// Nulls appear first.
175    NullsFirst,
176    /// Nulls appear last.
177    NullsLast,
178}
179
180impl NullOrdering {
181    /// Evaluates the null ordering based on the given ascending flag.
182    ///
183    /// # Returns
184    /// * `true` if nulls should appear first.
185    /// * `false` if nulls should appear last.
186    pub fn nulls_first(&self, asc: bool) -> bool {
187        match self {
188            Self::NullsMax => !asc,
189            Self::NullsMin => asc,
190            Self::NullsFirst => true,
191            Self::NullsLast => false,
192        }
193    }
194}
195
196impl FromStr for NullOrdering {
197    type Err = DataFusionError;
198
199    fn from_str(s: &str) -> Result<Self> {
200        match s {
201            "nulls_max" => Ok(Self::NullsMax),
202            "nulls_min" => Ok(Self::NullsMin),
203            "nulls_first" => Ok(Self::NullsFirst),
204            "nulls_last" => Ok(Self::NullsLast),
205            _ => plan_err!(
206                "Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"
207            ),
208        }
209    }
210}
211
212impl From<&str> for NullOrdering {
213    fn from(s: &str) -> Self {
214        Self::from_str(s).unwrap_or(Self::NullsMax)
215    }
216}
217
218/// Ident Normalizer
219#[derive(Debug)]
220pub struct IdentNormalizer {
221    normalize: bool,
222}
223
224impl Default for IdentNormalizer {
225    fn default() -> Self {
226        Self { normalize: true }
227    }
228}
229
230impl IdentNormalizer {
231    pub fn new(normalize: bool) -> Self {
232        Self { normalize }
233    }
234
235    pub fn normalize(&self, ident: Ident) -> String {
236        if self.normalize {
237            crate::utils::normalize_ident(ident)
238        } else {
239            ident.value
240        }
241    }
242}
243
244/// Struct to store the states used by the Planner. The Planner will leverage the states
245/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
246/// Common Table Expression (CTE) provided with WITH clause and
247/// Parameter Data Types provided with PREPARE statement and the query schema of the
248/// outer query plan.
249///
250/// # Cloning
251///
252/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
253/// This helps resolve scoping issues of CTEs.
254/// By using cloning, a subquery can inherit CTEs from the outer query
255/// and can also define its own private CTEs without affecting the outer query.
256#[derive(Debug, Clone)]
257pub struct PlannerContext {
258    /// Data types for numbered parameters ($1, $2, etc), if supplied
259    /// in `PREPARE` statement
260    prepare_param_data_types: Arc<Vec<FieldRef>>,
261    /// Map of CTE name to logical plan of the WITH clause.
262    /// Use `Arc<LogicalPlan>` to allow cheap cloning
263    ctes: HashMap<String, Arc<LogicalPlan>>,
264    /// The query schema of the outer query plan, used to resolve the columns in subquery
265    outer_query_schema: Option<DFSchemaRef>,
266    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
267    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
268    outer_from_schema: Option<DFSchemaRef>,
269    /// The query schema defined by the table
270    create_table_schema: Option<DFSchemaRef>,
271}
272
273impl Default for PlannerContext {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279impl PlannerContext {
280    /// Create an empty PlannerContext
281    pub fn new() -> Self {
282        Self {
283            prepare_param_data_types: Arc::new(vec![]),
284            ctes: HashMap::new(),
285            outer_query_schema: None,
286            outer_from_schema: None,
287            create_table_schema: None,
288        }
289    }
290
291    /// Update the PlannerContext with provided prepare_param_data_types
292    pub fn with_prepare_param_data_types(
293        mut self,
294        prepare_param_data_types: Vec<FieldRef>,
295    ) -> Self {
296        self.prepare_param_data_types = prepare_param_data_types.into();
297        self
298    }
299
300    // Return a reference to the outer query's schema
301    pub fn outer_query_schema(&self) -> Option<&DFSchema> {
302        self.outer_query_schema.as_ref().map(|s| s.as_ref())
303    }
304
305    /// Sets the outer query schema, returning the existing one, if
306    /// any
307    pub fn set_outer_query_schema(
308        &mut self,
309        mut schema: Option<DFSchemaRef>,
310    ) -> Option<DFSchemaRef> {
311        std::mem::swap(&mut self.outer_query_schema, &mut schema);
312        schema
313    }
314
315    pub fn set_table_schema(
316        &mut self,
317        mut schema: Option<DFSchemaRef>,
318    ) -> Option<DFSchemaRef> {
319        std::mem::swap(&mut self.create_table_schema, &mut schema);
320        schema
321    }
322
323    pub fn table_schema(&self) -> Option<DFSchemaRef> {
324        self.create_table_schema.clone()
325    }
326
327    // Return a clone of the outer FROM schema
328    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
329        self.outer_from_schema.clone()
330    }
331
332    /// Sets the outer FROM schema, returning the existing one, if any
333    pub fn set_outer_from_schema(
334        &mut self,
335        mut schema: Option<DFSchemaRef>,
336    ) -> Option<DFSchemaRef> {
337        std::mem::swap(&mut self.outer_from_schema, &mut schema);
338        schema
339    }
340
341    /// Extends the FROM schema, returning the existing one, if any
342    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
343        match self.outer_from_schema.as_mut() {
344            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
345            None => self.outer_from_schema = Some(Arc::clone(schema)),
346        };
347        Ok(())
348    }
349
350    /// Return the types of parameters (`$1`, `$2`, etc) if known
351    pub fn prepare_param_data_types(&self) -> &[FieldRef] {
352        &self.prepare_param_data_types
353    }
354
355    /// Returns true if there is a Common Table Expression (CTE) /
356    /// Subquery for the specified name
357    pub fn contains_cte(&self, cte_name: &str) -> bool {
358        self.ctes.contains_key(cte_name)
359    }
360
361    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
362    /// Subquery for the specified name
363    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
364        let cte_name = cte_name.into();
365        self.ctes.insert(cte_name, Arc::new(plan));
366    }
367
368    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
369    /// specified name
370    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
371        self.ctes.get(cte_name).map(|cte| cte.as_ref())
372    }
373
374    /// Remove the plan of CTE / Subquery for the specified name
375    pub(super) fn remove_cte(&mut self, cte_name: &str) {
376        self.ctes.remove(cte_name);
377    }
378}
379
380/// SQL query planner and binder
381///
382/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
383///
384/// You can control the behavior of the planner by providing [`ParserOptions`].
385///
386/// It performs the following tasks:
387///
388/// 1. Name and type resolution (called "binding" in other systems). This
389///    phase looks up table and column names using the [`ContextProvider`].
390/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
391///
392/// It does not perform type coercion, or perform optimization, which are done
393/// by subsequent passes.
394///
395/// Key interfaces are:
396/// * [`Self::sql_statement_to_plan`]: Convert a statement
397///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
398/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
399pub struct SqlToRel<'a, S: ContextProvider> {
400    pub(crate) context_provider: &'a S,
401    pub(crate) options: ParserOptions,
402    pub(crate) ident_normalizer: IdentNormalizer,
403}
404
405impl<'a, S: ContextProvider> SqlToRel<'a, S> {
406    /// Create a new query planner.
407    ///
408    /// The query planner derives the parser options from the context provider.
409    pub fn new(context_provider: &'a S) -> Self {
410        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
411        Self::new_with_options(context_provider, parser_options)
412    }
413
414    /// Create a new query planner with the given parser options.
415    ///
416    /// The query planner ignores the parser options from the context provider
417    /// and uses the given parser options instead.
418    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
419        let ident_normalize = options.enable_ident_normalization;
420
421        SqlToRel {
422            context_provider,
423            options,
424            ident_normalizer: IdentNormalizer::new(ident_normalize),
425        }
426    }
427
428    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
429        let mut fields = Vec::with_capacity(columns.len());
430
431        for column in columns {
432            let data_type = self.convert_data_type_to_field(&column.data_type)?;
433            let not_nullable = column
434                .options
435                .iter()
436                .any(|x| x.option == ColumnOption::NotNull);
437            fields.push(
438                data_type
439                    .as_ref()
440                    .clone()
441                    .with_name(self.ident_normalizer.normalize(column.name))
442                    .with_nullable(!not_nullable),
443            );
444        }
445
446        Ok(Schema::new(fields))
447    }
448
449    /// Returns a vector of (column_name, default_expr) pairs
450    pub(super) fn build_column_defaults(
451        &self,
452        columns: &Vec<SQLColumnDef>,
453        planner_context: &mut PlannerContext,
454    ) -> Result<Vec<(String, Expr)>> {
455        let mut column_defaults = vec![];
456        // Default expressions are restricted, column references are not allowed
457        let empty_schema = DFSchema::empty();
458        let error_desc = |e: DataFusionError| match e {
459            DataFusionError::SchemaError(ref err, _)
460                if matches!(**err, SchemaError::FieldNotFound { .. }) =>
461            {
462                plan_datafusion_err!(
463                    "Column reference is not allowed in the DEFAULT expression : {}",
464                    e
465                )
466            }
467            _ => e,
468        };
469
470        for column in columns {
471            if let Some(default_sql_expr) =
472                column.options.iter().find_map(|o| match &o.option {
473                    ColumnOption::Default(expr) => Some(expr),
474                    _ => None,
475                })
476            {
477                let default_expr = self
478                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
479                    .map_err(error_desc)?;
480                column_defaults.push((
481                    self.ident_normalizer.normalize(column.name.clone()),
482                    default_expr,
483                ));
484            }
485        }
486        Ok(column_defaults)
487    }
488
489    /// Apply the given TableAlias to the input plan
490    pub(crate) fn apply_table_alias(
491        &self,
492        plan: LogicalPlan,
493        alias: TableAlias,
494    ) -> Result<LogicalPlan> {
495        let idents = alias.columns.into_iter().map(|c| c.name).collect();
496        let plan = self.apply_expr_alias(plan, idents)?;
497
498        LogicalPlanBuilder::from(plan)
499            .alias(TableReference::bare(
500                self.ident_normalizer.normalize(alias.name),
501            ))?
502            .build()
503    }
504
505    pub(crate) fn apply_expr_alias(
506        &self,
507        plan: LogicalPlan,
508        idents: Vec<Ident>,
509    ) -> Result<LogicalPlan> {
510        if idents.is_empty() {
511            Ok(plan)
512        } else if idents.len() != plan.schema().fields().len() {
513            plan_err!(
514                "Source table contains {} columns but only {} \
515                names given as column alias",
516                plan.schema().fields().len(),
517                idents.len()
518            )
519        } else {
520            let fields = plan.schema().fields().clone();
521            LogicalPlanBuilder::from(plan)
522                .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
523                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
524                }))?
525                .build()
526        }
527    }
528
529    /// Validate the schema provides all of the columns referenced in the expressions.
530    pub(crate) fn validate_schema_satisfies_exprs(
531        &self,
532        schema: &DFSchema,
533        exprs: &[Expr],
534    ) -> Result<()> {
535        find_column_exprs(exprs)
536            .iter()
537            .try_for_each(|col| match col {
538                Expr::Column(col) => match &col.relation {
539                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
540                    None => {
541                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
542                            Ok(())
543                        } else {
544                            Err(field_not_found(
545                                col.relation.clone(),
546                                col.name.as_str(),
547                                schema,
548                            ))
549                        }
550                    }
551                }
552                .map_err(|err: DataFusionError| match &err {
553                    DataFusionError::SchemaError(inner, _)
554                        if matches!(
555                            inner.as_ref(),
556                            SchemaError::FieldNotFound { .. }
557                        ) =>
558                    {
559                        let SchemaError::FieldNotFound {
560                            field,
561                            valid_fields,
562                        } = inner.as_ref()
563                        else {
564                            unreachable!()
565                        };
566                        let mut diagnostic = if let Some(relation) = &col.relation {
567                            Diagnostic::new_error(
568                                format!(
569                                    "column '{}' not found in '{}'",
570                                    &col.name, relation
571                                ),
572                                col.spans().first(),
573                            )
574                        } else {
575                            Diagnostic::new_error(
576                                format!("column '{}' not found", &col.name),
577                                col.spans().first(),
578                            )
579                        };
580                        add_possible_columns_to_diag(
581                            &mut diagnostic,
582                            field,
583                            valid_fields,
584                        );
585                        err.with_diagnostic(diagnostic)
586                    }
587                    _ => err,
588                }),
589                _ => internal_err!("Not a column"),
590            })
591    }
592
593    pub(crate) fn convert_data_type_to_field(
594        &self,
595        sql_type: &SQLDataType,
596    ) -> Result<FieldRef> {
597        // First check if any of the registered type_planner can handle this type
598        if let Some(type_planner) = self.context_provider.get_type_planner()
599            && let Some(data_type) = type_planner.plan_type(sql_type)?
600        {
601            return Ok(data_type.into_nullable_field_ref());
602        }
603
604        // If no type_planner can handle this type, use the default conversion
605        match sql_type {
606            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
607                // Arrays may be multi-dimensional.
608                Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
609            }
610            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
611                inner_sql_type,
612                maybe_array_size,
613            )) => {
614                let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
615                if let Some(array_size) = maybe_array_size {
616                    let array_size: i32 = (*array_size).try_into().map_err(|_| {
617                        plan_datafusion_err!(
618                            "Array size must be a positive 32 bit integer, got {array_size}"
619                        )
620                    })?;
621                    Ok(inner_field.into_fixed_size_list(array_size))
622                } else {
623                    Ok(inner_field.into_list())
624                }
625            }
626            SQLDataType::Array(ArrayElemTypeDef::None) => {
627                not_impl_err!("Arrays with unspecified type is not supported")
628            }
629            other => Ok(self
630                .convert_simple_data_type(other)?
631                .into_nullable_field_ref()),
632        }
633    }
634
635    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
636        match sql_type {
637            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
638            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
639            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
640            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
641                Ok(DataType::Int32)
642            }
643            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
644            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
645            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
646                Ok(DataType::UInt16)
647            }
648            SQLDataType::IntUnsigned(_)
649            | SQLDataType::IntegerUnsigned(_)
650            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
651            SQLDataType::Varchar(length) => {
652                match (length, self.options.support_varchar_with_length) {
653                    (Some(_), false) => plan_err!(
654                        "does not support Varchar with length, \
655                    please set `support_varchar_with_length` to be true"
656                    ),
657                    _ => {
658                        if self.options.map_string_types_to_utf8view {
659                            Ok(DataType::Utf8View)
660                        } else {
661                            Ok(DataType::Utf8)
662                        }
663                    }
664                }
665            }
666            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
667                Ok(DataType::UInt64)
668            }
669            SQLDataType::Float(_) => Ok(DataType::Float32),
670            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
671            SQLDataType::Double(ExactNumberInfo::None)
672            | SQLDataType::DoublePrecision
673            | SQLDataType::Float8 => Ok(DataType::Float64),
674            SQLDataType::Double(
675                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
676            ) => {
677                not_impl_err!(
678                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
679                )
680            }
681            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
682                if self.options.map_string_types_to_utf8view {
683                    Ok(DataType::Utf8View)
684                } else {
685                    Ok(DataType::Utf8)
686                }
687            }
688            SQLDataType::Timestamp(precision, tz_info)
689                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
690            {
691                let tz = if matches!(tz_info, TimezoneInfo::Tz)
692                    || matches!(tz_info, TimezoneInfo::WithTimeZone)
693                {
694                    // Timestamp With Time Zone
695                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
696                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
697                    self.context_provider.options().execution.time_zone.clone()
698                } else {
699                    // Timestamp Without Time zone
700                    None
701                };
702                let precision = match precision {
703                    Some(0) => TimeUnit::Second,
704                    Some(3) => TimeUnit::Millisecond,
705                    Some(6) => TimeUnit::Microsecond,
706                    None | Some(9) => TimeUnit::Nanosecond,
707                    _ => unreachable!(),
708                };
709                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
710            }
711            SQLDataType::Date => Ok(DataType::Date32),
712            SQLDataType::Time(None, tz_info) => {
713                if matches!(tz_info, TimezoneInfo::None)
714                    || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
715                {
716                    Ok(DataType::Time64(TimeUnit::Nanosecond))
717                } else {
718                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
719                    not_impl_err!("Unsupported SQL type {sql_type}")
720                }
721            }
722            SQLDataType::Numeric(exact_number_info)
723            | SQLDataType::Decimal(exact_number_info) => {
724                let (precision, scale) = match *exact_number_info {
725                    ExactNumberInfo::None => (None, None),
726                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
727                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
728                        (Some(precision), Some(scale))
729                    }
730                };
731                make_decimal_type(precision, scale.map(|s| s as u64))
732            }
733            SQLDataType::Bytea => Ok(DataType::Binary),
734            SQLDataType::Interval { fields, precision } => {
735                if fields.is_some() || precision.is_some() {
736                    return not_impl_err!("Unsupported SQL type {sql_type}");
737                }
738                Ok(DataType::Interval(IntervalUnit::MonthDayNano))
739            }
740            SQLDataType::Struct(fields, _) => {
741                let fields = fields
742                    .iter()
743                    .enumerate()
744                    .map(|(idx, sql_struct_field)| {
745                        let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
746                        let field_name = match &sql_struct_field.field_name {
747                            Some(ident) => ident.clone(),
748                            None => Ident::new(format!("c{idx}")),
749                        };
750                        Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
751                    })
752                    .collect::<Result<Vec<_>>>()?;
753                Ok(DataType::Struct(Fields::from(fields)))
754            }
755            SQLDataType::Nvarchar(_)
756            | SQLDataType::JSON
757            | SQLDataType::Uuid
758            | SQLDataType::Binary(_)
759            | SQLDataType::Varbinary(_)
760            | SQLDataType::Blob(_)
761            | SQLDataType::Datetime(_)
762            | SQLDataType::Regclass
763            | SQLDataType::Custom(_, _)
764            | SQLDataType::Array(_)
765            | SQLDataType::Enum(_, _)
766            | SQLDataType::Set(_)
767            | SQLDataType::MediumInt(_)
768            | SQLDataType::MediumIntUnsigned(_)
769            | SQLDataType::Character(_)
770            | SQLDataType::CharacterVarying(_)
771            | SQLDataType::CharVarying(_)
772            | SQLDataType::CharacterLargeObject(_)
773            | SQLDataType::CharLargeObject(_)
774            | SQLDataType::Timestamp(_, _)
775            | SQLDataType::Time(Some(_), _)
776            | SQLDataType::Dec(_)
777            | SQLDataType::BigNumeric(_)
778            | SQLDataType::BigDecimal(_)
779            | SQLDataType::Clob(_)
780            | SQLDataType::Bytes(_)
781            | SQLDataType::Int64
782            | SQLDataType::Float64
783            | SQLDataType::JSONB
784            | SQLDataType::Unspecified
785            | SQLDataType::Int16
786            | SQLDataType::Int32
787            | SQLDataType::Int128
788            | SQLDataType::Int256
789            | SQLDataType::UInt8
790            | SQLDataType::UInt16
791            | SQLDataType::UInt32
792            | SQLDataType::UInt64
793            | SQLDataType::UInt128
794            | SQLDataType::UInt256
795            | SQLDataType::Float32
796            | SQLDataType::Date32
797            | SQLDataType::Datetime64(_, _)
798            | SQLDataType::FixedString(_)
799            | SQLDataType::Map(_, _)
800            | SQLDataType::Tuple(_)
801            | SQLDataType::Nested(_)
802            | SQLDataType::Union(_)
803            | SQLDataType::Nullable(_)
804            | SQLDataType::LowCardinality(_)
805            | SQLDataType::Trigger
806            | SQLDataType::TinyBlob
807            | SQLDataType::MediumBlob
808            | SQLDataType::LongBlob
809            | SQLDataType::TinyText
810            | SQLDataType::MediumText
811            | SQLDataType::LongText
812            | SQLDataType::Bit(_)
813            | SQLDataType::BitVarying(_)
814            | SQLDataType::Signed
815            | SQLDataType::SignedInteger
816            | SQLDataType::Unsigned
817            | SQLDataType::UnsignedInteger
818            | SQLDataType::AnyType
819            | SQLDataType::Table(_)
820            | SQLDataType::VarBit(_)
821            | SQLDataType::UTinyInt
822            | SQLDataType::USmallInt
823            | SQLDataType::HugeInt
824            | SQLDataType::UHugeInt
825            | SQLDataType::UBigInt
826            | SQLDataType::TimestampNtz
827            | SQLDataType::NamedTable { .. }
828            | SQLDataType::TsVector
829            | SQLDataType::TsQuery
830            | SQLDataType::GeometricType(_)
831            | SQLDataType::DecimalUnsigned(_) // deprecated mysql type
832            | SQLDataType::FloatUnsigned(_) // deprecated mysql type
833            | SQLDataType::RealUnsigned // deprecated mysql type
834            | SQLDataType::DecUnsigned(_) // deprecated mysql type
835            | SQLDataType::DoubleUnsigned(_) // deprecated mysql type
836            | SQLDataType::DoublePrecisionUnsigned // deprecated mysql type
837            => {
838                not_impl_err!("Unsupported SQL type {sql_type}")
839            }
840        }
841    }
842
843    pub(crate) fn object_name_to_table_reference(
844        &self,
845        object_name: ObjectName,
846    ) -> Result<TableReference> {
847        object_name_to_table_reference(
848            object_name,
849            self.options.enable_ident_normalization,
850        )
851    }
852}
853
854/// Create a [`TableReference`] after normalizing the specified ObjectName
855///
856/// Examples
857/// ```text
858/// ['foo']          -> Bare { table: "foo" }
859/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
860/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
861/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
862/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
863/// ```
864pub fn object_name_to_table_reference(
865    object_name: ObjectName,
866    enable_normalization: bool,
867) -> Result<TableReference> {
868    // Use destructure to make it clear no fields on ObjectName are ignored
869    let ObjectName(object_name_parts) = object_name;
870    let idents = object_name_parts
871        .into_iter()
872        .map(|object_name_part| {
873            object_name_part.as_ident().cloned().ok_or_else(|| {
874                plan_datafusion_err!(
875                    "Expected identifier, but found: {:?}",
876                    object_name_part
877                )
878            })
879        })
880        .collect::<Result<Vec<_>>>()?;
881    idents_to_table_reference(idents, enable_normalization)
882}
883
884struct IdentTaker {
885    normalizer: IdentNormalizer,
886    idents: Vec<Ident>,
887}
888
889/// Take the next identifier from the back of idents, panic'ing if
890/// there are none left
891impl IdentTaker {
892    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
893        Self {
894            normalizer: IdentNormalizer::new(enable_normalization),
895            idents,
896        }
897    }
898
899    fn take(&mut self) -> String {
900        let ident = self.idents.pop().expect("no more identifiers");
901        self.normalizer.normalize(ident)
902    }
903
904    /// Returns the number of remaining identifiers
905    fn len(&self) -> usize {
906        self.idents.len()
907    }
908}
909
910// impl Display for a nicer error message
911impl std::fmt::Display for IdentTaker {
912    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
913        let mut first = true;
914        for ident in self.idents.iter() {
915            if !first {
916                write!(f, ".")?;
917            }
918            write!(f, "{ident}")?;
919            first = false;
920        }
921
922        Ok(())
923    }
924}
925
926/// Create a [`TableReference`] after normalizing the specified identifier
927pub(crate) fn idents_to_table_reference(
928    idents: Vec<Ident>,
929    enable_normalization: bool,
930) -> Result<TableReference> {
931    let mut taker = IdentTaker::new(idents, enable_normalization);
932
933    match taker.len() {
934        1 => {
935            let table = taker.take();
936            Ok(TableReference::bare(table))
937        }
938        2 => {
939            let table = taker.take();
940            let schema = taker.take();
941            Ok(TableReference::partial(schema, table))
942        }
943        3 => {
944            let table = taker.take();
945            let schema = taker.take();
946            let catalog = taker.take();
947            Ok(TableReference::full(catalog, schema, table))
948        }
949        _ => plan_err!(
950            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
951            taker,
952            taker.len()
953        ),
954    }
955}
956
957/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
958/// from the provided object identifiers (catalog, schema and table names).
959pub fn object_name_to_qualifier(
960    sql_table_name: &ObjectName,
961    enable_normalization: bool,
962) -> Result<String> {
963    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
964    let normalizer = IdentNormalizer::new(enable_normalization);
965    sql_table_name
966        .0
967        .iter()
968        .rev()
969        .zip(columns)
970        .map(|(object_name_part, column_name)| {
971            object_name_part
972                .as_ident()
973                .map(|ident| {
974                    format!(
975                        r#"{} = '{}'"#,
976                        column_name,
977                        normalizer.normalize(ident.clone())
978                    )
979                })
980                .ok_or_else(|| {
981                    plan_datafusion_err!(
982                        "Expected identifier, but found: {:?}",
983                        object_name_part
984                    )
985                })
986        })
987        .collect::<Result<Vec<_>>>()
988        .map(|parts| parts.join(" AND "))
989}