datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::config::SqlParserOptions;
27use datafusion_common::datatype::{DataTypeExt, FieldExt};
28use datafusion_common::error::add_possible_columns_to_diag;
29use datafusion_common::TableReference;
30use datafusion_common::{
31    field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic,
32    SchemaError,
33};
34use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{col, Expr};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43/// SQL parser options
44#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46    /// Whether to parse float as decimal.
47    pub parse_float_as_decimal: bool,
48    /// Whether to normalize identifiers.
49    pub enable_ident_normalization: bool,
50    /// Whether to support varchar with length.
51    pub support_varchar_with_length: bool,
52    /// Whether to normalize options value.
53    pub enable_options_value_normalization: bool,
54    /// Whether to collect spans
55    pub collect_spans: bool,
56    /// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
57    pub map_string_types_to_utf8view: bool,
58    /// Default null ordering for sorting expressions.
59    pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63    /// Creates a new `ParserOptions` instance with default values.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use datafusion_sql::planner::ParserOptions;
69    /// let opts = ParserOptions::new();
70    /// assert_eq!(opts.parse_float_as_decimal, false);
71    /// assert_eq!(opts.enable_ident_normalization, true);
72    /// ```
73    pub fn new() -> Self {
74        Self {
75            parse_float_as_decimal: false,
76            enable_ident_normalization: true,
77            support_varchar_with_length: true,
78            map_string_types_to_utf8view: true,
79            enable_options_value_normalization: false,
80            collect_spans: false,
81            // By default, `nulls_max` is used to follow Postgres's behavior.
82            // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
83            default_null_ordering: NullOrdering::NullsMax,
84        }
85    }
86
87    /// Sets the `parse_float_as_decimal` option.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use datafusion_sql::planner::ParserOptions;
93    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
94    /// assert_eq!(opts.parse_float_as_decimal, true);
95    /// ```
96    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97        self.parse_float_as_decimal = value;
98        self
99    }
100
101    /// Sets the `enable_ident_normalization` option.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use datafusion_sql::planner::ParserOptions;
107    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
108    /// assert_eq!(opts.enable_ident_normalization, false);
109    /// ```
110    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111        self.enable_ident_normalization = value;
112        self
113    }
114
115    /// Sets the `support_varchar_with_length` option.
116    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117        self.support_varchar_with_length = value;
118        self
119    }
120
121    /// Sets the `map_string_types_to_utf8view` option.
122    pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123        self.map_string_types_to_utf8view = value;
124        self
125    }
126
127    /// Sets the `enable_options_value_normalization` option.
128    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129        self.enable_options_value_normalization = value;
130        self
131    }
132
133    /// Sets the `collect_spans` option.
134    pub fn with_collect_spans(mut self, value: bool) -> Self {
135        self.collect_spans = value;
136        self
137    }
138
139    /// Sets the `default_null_ordering` option.
140    pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141        self.default_null_ordering = value;
142        self
143    }
144}
145
146impl Default for ParserOptions {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153    fn from(options: &SqlParserOptions) -> Self {
154        Self {
155            parse_float_as_decimal: options.parse_float_as_decimal,
156            enable_ident_normalization: options.enable_ident_normalization,
157            support_varchar_with_length: options.support_varchar_with_length,
158            map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159            enable_options_value_normalization: options
160                .enable_options_value_normalization,
161            collect_spans: options.collect_spans,
162            default_null_ordering: options.default_null_ordering.as_str().into(),
163        }
164    }
165}
166
167/// Represents the null ordering for sorting expressions.
168#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170    /// Nulls appear last in ascending order.
171    NullsMax,
172    /// Nulls appear first in descending order.
173    NullsMin,
174    /// Nulls appear first.
175    NullsFirst,
176    /// Nulls appear last.
177    NullsLast,
178}
179
180impl NullOrdering {
181    /// Evaluates the null ordering based on the given ascending flag.
182    ///
183    /// # Returns
184    /// * `true` if nulls should appear first.
185    /// * `false` if nulls should appear last.
186    pub fn nulls_first(&self, asc: bool) -> bool {
187        match self {
188            Self::NullsMax => !asc,
189            Self::NullsMin => asc,
190            Self::NullsFirst => true,
191            Self::NullsLast => false,
192        }
193    }
194}
195
196impl FromStr for NullOrdering {
197    type Err = DataFusionError;
198
199    fn from_str(s: &str) -> Result<Self> {
200        match s {
201            "nulls_max" => Ok(Self::NullsMax),
202            "nulls_min" => Ok(Self::NullsMin),
203            "nulls_first" => Ok(Self::NullsFirst),
204            "nulls_last" => Ok(Self::NullsLast),
205            _ => plan_err!("Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"),
206        }
207    }
208}
209
210impl From<&str> for NullOrdering {
211    fn from(s: &str) -> Self {
212        Self::from_str(s).unwrap_or(Self::NullsMax)
213    }
214}
215
216/// Ident Normalizer
217#[derive(Debug)]
218pub struct IdentNormalizer {
219    normalize: bool,
220}
221
222impl Default for IdentNormalizer {
223    fn default() -> Self {
224        Self { normalize: true }
225    }
226}
227
228impl IdentNormalizer {
229    pub fn new(normalize: bool) -> Self {
230        Self { normalize }
231    }
232
233    pub fn normalize(&self, ident: Ident) -> String {
234        if self.normalize {
235            crate::utils::normalize_ident(ident)
236        } else {
237            ident.value
238        }
239    }
240}
241
242/// Struct to store the states used by the Planner. The Planner will leverage the states
243/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
244/// Common Table Expression (CTE) provided with WITH clause and
245/// Parameter Data Types provided with PREPARE statement and the query schema of the
246/// outer query plan.
247///
248/// # Cloning
249///
250/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
251/// This helps resolve scoping issues of CTEs.
252/// By using cloning, a subquery can inherit CTEs from the outer query
253/// and can also define its own private CTEs without affecting the outer query.
254#[derive(Debug, Clone)]
255pub struct PlannerContext {
256    /// Data types for numbered parameters ($1, $2, etc), if supplied
257    /// in `PREPARE` statement
258    prepare_param_data_types: Arc<Vec<FieldRef>>,
259    /// Map of CTE name to logical plan of the WITH clause.
260    /// Use `Arc<LogicalPlan>` to allow cheap cloning
261    ctes: HashMap<String, Arc<LogicalPlan>>,
262    /// The query schema of the outer query plan, used to resolve the columns in subquery
263    outer_query_schema: Option<DFSchemaRef>,
264    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
265    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
266    outer_from_schema: Option<DFSchemaRef>,
267    /// The query schema defined by the table
268    create_table_schema: Option<DFSchemaRef>,
269}
270
271impl Default for PlannerContext {
272    fn default() -> Self {
273        Self::new()
274    }
275}
276
277impl PlannerContext {
278    /// Create an empty PlannerContext
279    pub fn new() -> Self {
280        Self {
281            prepare_param_data_types: Arc::new(vec![]),
282            ctes: HashMap::new(),
283            outer_query_schema: None,
284            outer_from_schema: None,
285            create_table_schema: None,
286        }
287    }
288
289    /// Update the PlannerContext with provided prepare_param_data_types
290    pub fn with_prepare_param_data_types(
291        mut self,
292        prepare_param_data_types: Vec<FieldRef>,
293    ) -> Self {
294        self.prepare_param_data_types = prepare_param_data_types.into();
295        self
296    }
297
298    // Return a reference to the outer query's schema
299    pub fn outer_query_schema(&self) -> Option<&DFSchema> {
300        self.outer_query_schema.as_ref().map(|s| s.as_ref())
301    }
302
303    /// Sets the outer query schema, returning the existing one, if
304    /// any
305    pub fn set_outer_query_schema(
306        &mut self,
307        mut schema: Option<DFSchemaRef>,
308    ) -> Option<DFSchemaRef> {
309        std::mem::swap(&mut self.outer_query_schema, &mut schema);
310        schema
311    }
312
313    pub fn set_table_schema(
314        &mut self,
315        mut schema: Option<DFSchemaRef>,
316    ) -> Option<DFSchemaRef> {
317        std::mem::swap(&mut self.create_table_schema, &mut schema);
318        schema
319    }
320
321    pub fn table_schema(&self) -> Option<DFSchemaRef> {
322        self.create_table_schema.clone()
323    }
324
325    // Return a clone of the outer FROM schema
326    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
327        self.outer_from_schema.clone()
328    }
329
330    /// Sets the outer FROM schema, returning the existing one, if any
331    pub fn set_outer_from_schema(
332        &mut self,
333        mut schema: Option<DFSchemaRef>,
334    ) -> Option<DFSchemaRef> {
335        std::mem::swap(&mut self.outer_from_schema, &mut schema);
336        schema
337    }
338
339    /// Extends the FROM schema, returning the existing one, if any
340    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
341        match self.outer_from_schema.as_mut() {
342            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
343            None => self.outer_from_schema = Some(Arc::clone(schema)),
344        };
345        Ok(())
346    }
347
348    /// Return the types of parameters (`$1`, `$2`, etc) if known
349    pub fn prepare_param_data_types(&self) -> &[FieldRef] {
350        &self.prepare_param_data_types
351    }
352
353    /// Returns true if there is a Common Table Expression (CTE) /
354    /// Subquery for the specified name
355    pub fn contains_cte(&self, cte_name: &str) -> bool {
356        self.ctes.contains_key(cte_name)
357    }
358
359    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
360    /// Subquery for the specified name
361    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
362        let cte_name = cte_name.into();
363        self.ctes.insert(cte_name, Arc::new(plan));
364    }
365
366    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
367    /// specified name
368    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
369        self.ctes.get(cte_name).map(|cte| cte.as_ref())
370    }
371
372    /// Remove the plan of CTE / Subquery for the specified name
373    pub(super) fn remove_cte(&mut self, cte_name: &str) {
374        self.ctes.remove(cte_name);
375    }
376}
377
378/// SQL query planner and binder
379///
380/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
381///
382/// You can control the behavior of the planner by providing [`ParserOptions`].
383///
384/// It performs the following tasks:
385///
386/// 1. Name and type resolution (called "binding" in other systems). This
387///    phase looks up table and column names using the [`ContextProvider`].
388/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
389///
390/// It does not perform type coercion, or perform optimization, which are done
391/// by subsequent passes.
392///
393/// Key interfaces are:
394/// * [`Self::sql_statement_to_plan`]: Convert a statement
395///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
396/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
397pub struct SqlToRel<'a, S: ContextProvider> {
398    pub(crate) context_provider: &'a S,
399    pub(crate) options: ParserOptions,
400    pub(crate) ident_normalizer: IdentNormalizer,
401}
402
403impl<'a, S: ContextProvider> SqlToRel<'a, S> {
404    /// Create a new query planner.
405    ///
406    /// The query planner derives the parser options from the context provider.
407    pub fn new(context_provider: &'a S) -> Self {
408        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
409        Self::new_with_options(context_provider, parser_options)
410    }
411
412    /// Create a new query planner with the given parser options.
413    ///
414    /// The query planner ignores the parser options from the context provider
415    /// and uses the given parser options instead.
416    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
417        let ident_normalize = options.enable_ident_normalization;
418
419        SqlToRel {
420            context_provider,
421            options,
422            ident_normalizer: IdentNormalizer::new(ident_normalize),
423        }
424    }
425
426    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
427        let mut fields = Vec::with_capacity(columns.len());
428
429        for column in columns {
430            let data_type = self.convert_data_type_to_field(&column.data_type)?;
431            let not_nullable = column
432                .options
433                .iter()
434                .any(|x| x.option == ColumnOption::NotNull);
435            fields.push(
436                data_type
437                    .as_ref()
438                    .clone()
439                    .with_name(self.ident_normalizer.normalize(column.name))
440                    .with_nullable(!not_nullable),
441            );
442        }
443
444        Ok(Schema::new(fields))
445    }
446
447    /// Returns a vector of (column_name, default_expr) pairs
448    pub(super) fn build_column_defaults(
449        &self,
450        columns: &Vec<SQLColumnDef>,
451        planner_context: &mut PlannerContext,
452    ) -> Result<Vec<(String, Expr)>> {
453        let mut column_defaults = vec![];
454        // Default expressions are restricted, column references are not allowed
455        let empty_schema = DFSchema::empty();
456        let error_desc = |e: DataFusionError| match e {
457            DataFusionError::SchemaError(ref err, _)
458                if matches!(**err, SchemaError::FieldNotFound { .. }) =>
459            {
460                plan_datafusion_err!(
461                    "Column reference is not allowed in the DEFAULT expression : {}",
462                    e
463                )
464            }
465            _ => e,
466        };
467
468        for column in columns {
469            if let Some(default_sql_expr) =
470                column.options.iter().find_map(|o| match &o.option {
471                    ColumnOption::Default(expr) => Some(expr),
472                    _ => None,
473                })
474            {
475                let default_expr = self
476                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
477                    .map_err(error_desc)?;
478                column_defaults.push((
479                    self.ident_normalizer.normalize(column.name.clone()),
480                    default_expr,
481                ));
482            }
483        }
484        Ok(column_defaults)
485    }
486
487    /// Apply the given TableAlias to the input plan
488    pub(crate) fn apply_table_alias(
489        &self,
490        plan: LogicalPlan,
491        alias: TableAlias,
492    ) -> Result<LogicalPlan> {
493        let idents = alias.columns.into_iter().map(|c| c.name).collect();
494        let plan = self.apply_expr_alias(plan, idents)?;
495
496        LogicalPlanBuilder::from(plan)
497            .alias(TableReference::bare(
498                self.ident_normalizer.normalize(alias.name),
499            ))?
500            .build()
501    }
502
503    pub(crate) fn apply_expr_alias(
504        &self,
505        plan: LogicalPlan,
506        idents: Vec<Ident>,
507    ) -> Result<LogicalPlan> {
508        if idents.is_empty() {
509            Ok(plan)
510        } else if idents.len() != plan.schema().fields().len() {
511            plan_err!(
512                "Source table contains {} columns but only {} \
513                names given as column alias",
514                plan.schema().fields().len(),
515                idents.len()
516            )
517        } else {
518            let fields = plan.schema().fields().clone();
519            LogicalPlanBuilder::from(plan)
520                .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
521                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
522                }))?
523                .build()
524        }
525    }
526
527    /// Validate the schema provides all of the columns referenced in the expressions.
528    pub(crate) fn validate_schema_satisfies_exprs(
529        &self,
530        schema: &DFSchema,
531        exprs: &[Expr],
532    ) -> Result<()> {
533        find_column_exprs(exprs)
534            .iter()
535            .try_for_each(|col| match col {
536                Expr::Column(col) => match &col.relation {
537                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
538                    None => {
539                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
540                            Ok(())
541                        } else {
542                            Err(field_not_found(
543                                col.relation.clone(),
544                                col.name.as_str(),
545                                schema,
546                            ))
547                        }
548                    }
549                }
550                .map_err(|err: DataFusionError| match &err {
551                    DataFusionError::SchemaError(inner, _)
552                        if matches!(
553                            inner.as_ref(),
554                            SchemaError::FieldNotFound { .. }
555                        ) =>
556                    {
557                        let SchemaError::FieldNotFound {
558                            field,
559                            valid_fields,
560                        } = inner.as_ref()
561                        else {
562                            unreachable!()
563                        };
564                        let mut diagnostic = if let Some(relation) = &col.relation {
565                            Diagnostic::new_error(
566                                format!(
567                                    "column '{}' not found in '{}'",
568                                    &col.name, relation
569                                ),
570                                col.spans().first(),
571                            )
572                        } else {
573                            Diagnostic::new_error(
574                                format!("column '{}' not found", &col.name),
575                                col.spans().first(),
576                            )
577                        };
578                        add_possible_columns_to_diag(
579                            &mut diagnostic,
580                            field,
581                            valid_fields,
582                        );
583                        err.with_diagnostic(diagnostic)
584                    }
585                    _ => err,
586                }),
587                _ => internal_err!("Not a column"),
588            })
589    }
590
591    pub(crate) fn convert_data_type_to_field(
592        &self,
593        sql_type: &SQLDataType,
594    ) -> Result<FieldRef> {
595        // First check if any of the registered type_planner can handle this type
596        if let Some(type_planner) = self.context_provider.get_type_planner() {
597            if let Some(data_type) = type_planner.plan_type(sql_type)? {
598                return Ok(data_type.into_nullable_field_ref());
599            }
600        }
601
602        // If no type_planner can handle this type, use the default conversion
603        match sql_type {
604            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
605                // Arrays may be multi-dimensional.
606                Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
607            }
608            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
609                inner_sql_type,
610                maybe_array_size,
611            )) => {
612                let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
613                if let Some(array_size) = maybe_array_size {
614                    let array_size: i32 = (*array_size).try_into().map_err(|_| {
615                        plan_datafusion_err!(
616                            "Array size must be a positive 32 bit integer, got {array_size}"
617                        )
618                    })?;
619                    Ok(inner_field.into_fixed_size_list(array_size))
620                } else {
621                    Ok(inner_field.into_list())
622                }
623            }
624            SQLDataType::Array(ArrayElemTypeDef::None) => {
625                not_impl_err!("Arrays with unspecified type is not supported")
626            }
627            other => Ok(self
628                .convert_simple_data_type(other)?
629                .into_nullable_field_ref()),
630        }
631    }
632
633    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
634        match sql_type {
635            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
636            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
637            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
638            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
639                Ok(DataType::Int32)
640            }
641            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
642            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
643            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
644                Ok(DataType::UInt16)
645            }
646            SQLDataType::IntUnsigned(_)
647            | SQLDataType::IntegerUnsigned(_)
648            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
649            SQLDataType::Varchar(length) => {
650                match (length, self.options.support_varchar_with_length) {
651                    (Some(_), false) => plan_err!(
652                        "does not support Varchar with length, \
653                    please set `support_varchar_with_length` to be true"
654                    ),
655                    _ => {
656                        if self.options.map_string_types_to_utf8view {
657                            Ok(DataType::Utf8View)
658                        } else {
659                            Ok(DataType::Utf8)
660                        }
661                    }
662                }
663            }
664            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
665                Ok(DataType::UInt64)
666            }
667            SQLDataType::Float(_) => Ok(DataType::Float32),
668            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
669            SQLDataType::Double(ExactNumberInfo::None)
670            | SQLDataType::DoublePrecision
671            | SQLDataType::Float8 => Ok(DataType::Float64),
672            SQLDataType::Double(
673                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
674            ) => {
675                not_impl_err!(
676                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
677                )
678            }
679            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
680                if self.options.map_string_types_to_utf8view {
681                    Ok(DataType::Utf8View)
682                } else {
683                    Ok(DataType::Utf8)
684                }
685            }
686            SQLDataType::Timestamp(precision, tz_info)
687                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
688            {
689                let tz = if matches!(tz_info, TimezoneInfo::Tz)
690                    || matches!(tz_info, TimezoneInfo::WithTimeZone)
691                {
692                    // Timestamp With Time Zone
693                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
694                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
695                    self.context_provider.options().execution.time_zone.clone()
696                } else {
697                    // Timestamp Without Time zone
698                    None
699                };
700                let precision = match precision {
701                    Some(0) => TimeUnit::Second,
702                    Some(3) => TimeUnit::Millisecond,
703                    Some(6) => TimeUnit::Microsecond,
704                    None | Some(9) => TimeUnit::Nanosecond,
705                    _ => unreachable!(),
706                };
707                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
708            }
709            SQLDataType::Date => Ok(DataType::Date32),
710            SQLDataType::Time(None, tz_info) => {
711                if matches!(tz_info, TimezoneInfo::None)
712                    || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
713                {
714                    Ok(DataType::Time64(TimeUnit::Nanosecond))
715                } else {
716                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
717                    not_impl_err!("Unsupported SQL type {sql_type}")
718                }
719            }
720            SQLDataType::Numeric(exact_number_info)
721            | SQLDataType::Decimal(exact_number_info) => {
722                let (precision, scale) = match *exact_number_info {
723                    ExactNumberInfo::None => (None, None),
724                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
725                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
726                        (Some(precision), Some(scale))
727                    }
728                };
729                make_decimal_type(precision, scale.map(|s| s as u64))
730            }
731            SQLDataType::Bytea => Ok(DataType::Binary),
732            SQLDataType::Interval { fields, precision } => {
733                if fields.is_some() || precision.is_some() {
734                    return not_impl_err!("Unsupported SQL type {sql_type}");
735                }
736                Ok(DataType::Interval(IntervalUnit::MonthDayNano))
737            }
738            SQLDataType::Struct(fields, _) => {
739                let fields = fields
740                    .iter()
741                    .enumerate()
742                    .map(|(idx, sql_struct_field)| {
743                        let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
744                        let field_name = match &sql_struct_field.field_name {
745                            Some(ident) => ident.clone(),
746                            None => Ident::new(format!("c{idx}")),
747                        };
748                        Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
749                    })
750                    .collect::<Result<Vec<_>>>()?;
751                Ok(DataType::Struct(Fields::from(fields)))
752            }
753            SQLDataType::Nvarchar(_)
754            | SQLDataType::JSON
755            | SQLDataType::Uuid
756            | SQLDataType::Binary(_)
757            | SQLDataType::Varbinary(_)
758            | SQLDataType::Blob(_)
759            | SQLDataType::Datetime(_)
760            | SQLDataType::Regclass
761            | SQLDataType::Custom(_, _)
762            | SQLDataType::Array(_)
763            | SQLDataType::Enum(_, _)
764            | SQLDataType::Set(_)
765            | SQLDataType::MediumInt(_)
766            | SQLDataType::MediumIntUnsigned(_)
767            | SQLDataType::Character(_)
768            | SQLDataType::CharacterVarying(_)
769            | SQLDataType::CharVarying(_)
770            | SQLDataType::CharacterLargeObject(_)
771            | SQLDataType::CharLargeObject(_)
772            | SQLDataType::Timestamp(_, _)
773            | SQLDataType::Time(Some(_), _)
774            | SQLDataType::Dec(_)
775            | SQLDataType::BigNumeric(_)
776            | SQLDataType::BigDecimal(_)
777            | SQLDataType::Clob(_)
778            | SQLDataType::Bytes(_)
779            | SQLDataType::Int64
780            | SQLDataType::Float64
781            | SQLDataType::JSONB
782            | SQLDataType::Unspecified
783            | SQLDataType::Int16
784            | SQLDataType::Int32
785            | SQLDataType::Int128
786            | SQLDataType::Int256
787            | SQLDataType::UInt8
788            | SQLDataType::UInt16
789            | SQLDataType::UInt32
790            | SQLDataType::UInt64
791            | SQLDataType::UInt128
792            | SQLDataType::UInt256
793            | SQLDataType::Float32
794            | SQLDataType::Date32
795            | SQLDataType::Datetime64(_, _)
796            | SQLDataType::FixedString(_)
797            | SQLDataType::Map(_, _)
798            | SQLDataType::Tuple(_)
799            | SQLDataType::Nested(_)
800            | SQLDataType::Union(_)
801            | SQLDataType::Nullable(_)
802            | SQLDataType::LowCardinality(_)
803            | SQLDataType::Trigger
804            | SQLDataType::TinyBlob
805            | SQLDataType::MediumBlob
806            | SQLDataType::LongBlob
807            | SQLDataType::TinyText
808            | SQLDataType::MediumText
809            | SQLDataType::LongText
810            | SQLDataType::Bit(_)
811            | SQLDataType::BitVarying(_)
812            | SQLDataType::Signed
813            | SQLDataType::SignedInteger
814            | SQLDataType::Unsigned
815            | SQLDataType::UnsignedInteger
816            | SQLDataType::AnyType
817            | SQLDataType::Table(_)
818            | SQLDataType::VarBit(_)
819            | SQLDataType::UTinyInt
820            | SQLDataType::USmallInt
821            | SQLDataType::HugeInt
822            | SQLDataType::UHugeInt
823            | SQLDataType::UBigInt
824            | SQLDataType::TimestampNtz
825            | SQLDataType::NamedTable { .. }
826            | SQLDataType::TsVector
827            | SQLDataType::TsQuery
828            | SQLDataType::GeometricType(_)
829            | SQLDataType::DecimalUnsigned(_) // deprecated mysql type
830            | SQLDataType::FloatUnsigned(_) // deprecated mysql type
831            | SQLDataType::RealUnsigned // deprecated mysql type
832            | SQLDataType::DecUnsigned(_) // deprecated mysql type
833            | SQLDataType::DoubleUnsigned(_) // deprecated mysql type
834            | SQLDataType::DoublePrecisionUnsigned // deprecated mysql type
835            => {
836                not_impl_err!("Unsupported SQL type {sql_type}")
837            }
838        }
839    }
840
841    pub(crate) fn object_name_to_table_reference(
842        &self,
843        object_name: ObjectName,
844    ) -> Result<TableReference> {
845        object_name_to_table_reference(
846            object_name,
847            self.options.enable_ident_normalization,
848        )
849    }
850}
851
852/// Create a [`TableReference`] after normalizing the specified ObjectName
853///
854/// Examples
855/// ```text
856/// ['foo']          -> Bare { table: "foo" }
857/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
858/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
859/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
860/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
861/// ```
862pub fn object_name_to_table_reference(
863    object_name: ObjectName,
864    enable_normalization: bool,
865) -> Result<TableReference> {
866    // Use destructure to make it clear no fields on ObjectName are ignored
867    let ObjectName(object_name_parts) = object_name;
868    let idents = object_name_parts
869        .into_iter()
870        .map(|object_name_part| {
871            object_name_part.as_ident().cloned().ok_or_else(|| {
872                plan_datafusion_err!(
873                    "Expected identifier, but found: {:?}",
874                    object_name_part
875                )
876            })
877        })
878        .collect::<Result<Vec<_>>>()?;
879    idents_to_table_reference(idents, enable_normalization)
880}
881
882struct IdentTaker {
883    normalizer: IdentNormalizer,
884    idents: Vec<Ident>,
885}
886
887/// Take the next identifier from the back of idents, panic'ing if
888/// there are none left
889impl IdentTaker {
890    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
891        Self {
892            normalizer: IdentNormalizer::new(enable_normalization),
893            idents,
894        }
895    }
896
897    fn take(&mut self) -> String {
898        let ident = self.idents.pop().expect("no more identifiers");
899        self.normalizer.normalize(ident)
900    }
901
902    /// Returns the number of remaining identifiers
903    fn len(&self) -> usize {
904        self.idents.len()
905    }
906}
907
908// impl Display for a nicer error message
909impl std::fmt::Display for IdentTaker {
910    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
911        let mut first = true;
912        for ident in self.idents.iter() {
913            if !first {
914                write!(f, ".")?;
915            }
916            write!(f, "{ident}")?;
917            first = false;
918        }
919
920        Ok(())
921    }
922}
923
924/// Create a [`TableReference`] after normalizing the specified identifier
925pub(crate) fn idents_to_table_reference(
926    idents: Vec<Ident>,
927    enable_normalization: bool,
928) -> Result<TableReference> {
929    let mut taker = IdentTaker::new(idents, enable_normalization);
930
931    match taker.len() {
932        1 => {
933            let table = taker.take();
934            Ok(TableReference::bare(table))
935        }
936        2 => {
937            let table = taker.take();
938            let schema = taker.take();
939            Ok(TableReference::partial(schema, table))
940        }
941        3 => {
942            let table = taker.take();
943            let schema = taker.take();
944            let catalog = taker.take();
945            Ok(TableReference::full(catalog, schema, table))
946        }
947        _ => plan_err!(
948            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
949            taker,
950            taker.len()
951        ),
952    }
953}
954
955/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
956/// from the provided object identifiers (catalog, schema and table names).
957pub fn object_name_to_qualifier(
958    sql_table_name: &ObjectName,
959    enable_normalization: bool,
960) -> Result<String> {
961    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
962    let normalizer = IdentNormalizer::new(enable_normalization);
963    sql_table_name
964        .0
965        .iter()
966        .rev()
967        .zip(columns)
968        .map(|(object_name_part, column_name)| {
969            object_name_part
970                .as_ident()
971                .map(|ident| {
972                    format!(
973                        r#"{} = '{}'"#,
974                        column_name,
975                        normalizer.normalize(ident.clone())
976                    )
977                })
978                .ok_or_else(|| {
979                    plan_datafusion_err!(
980                        "Expected identifier, but found: {:?}",
981                        object_name_part
982                    )
983                })
984        })
985        .collect::<Result<Vec<_>>>()
986        .map(|parts| parts.join(" AND "))
987}