datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use arrow::datatypes::*;
25use datafusion_common::config::SqlParserOptions;
26use datafusion_common::error::add_possible_columns_to_diag;
27use datafusion_common::TableReference;
28use datafusion_common::{
29    field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic,
30    SchemaError,
31};
32use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
33use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
34use datafusion_expr::utils::find_column_exprs;
35use datafusion_expr::{col, Expr};
36use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
37use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
38use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
39
40use crate::utils::make_decimal_type;
41pub use datafusion_expr::planner::ContextProvider;
42
43/// SQL parser options
44#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46    /// Whether to parse float as decimal.
47    pub parse_float_as_decimal: bool,
48    /// Whether to normalize identifiers.
49    pub enable_ident_normalization: bool,
50    /// Whether to support varchar with length.
51    pub support_varchar_with_length: bool,
52    /// Whether to normalize options value.
53    pub enable_options_value_normalization: bool,
54    /// Whether to collect spans
55    pub collect_spans: bool,
56    /// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
57    pub map_string_types_to_utf8view: bool,
58    /// Default null ordering for sorting expressions.
59    pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63    /// Creates a new `ParserOptions` instance with default values.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use datafusion_sql::planner::ParserOptions;
69    /// let opts = ParserOptions::new();
70    /// assert_eq!(opts.parse_float_as_decimal, false);
71    /// assert_eq!(opts.enable_ident_normalization, true);
72    /// ```
73    pub fn new() -> Self {
74        Self {
75            parse_float_as_decimal: false,
76            enable_ident_normalization: true,
77            support_varchar_with_length: true,
78            map_string_types_to_utf8view: true,
79            enable_options_value_normalization: false,
80            collect_spans: false,
81            // By default, `nulls_max` is used to follow Postgres's behavior.
82            // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
83            default_null_ordering: NullOrdering::NullsMax,
84        }
85    }
86
87    /// Sets the `parse_float_as_decimal` option.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use datafusion_sql::planner::ParserOptions;
93    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
94    /// assert_eq!(opts.parse_float_as_decimal, true);
95    /// ```
96    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97        self.parse_float_as_decimal = value;
98        self
99    }
100
101    /// Sets the `enable_ident_normalization` option.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use datafusion_sql::planner::ParserOptions;
107    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
108    /// assert_eq!(opts.enable_ident_normalization, false);
109    /// ```
110    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111        self.enable_ident_normalization = value;
112        self
113    }
114
115    /// Sets the `support_varchar_with_length` option.
116    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117        self.support_varchar_with_length = value;
118        self
119    }
120
121    /// Sets the `map_string_types_to_utf8view` option.
122    pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123        self.map_string_types_to_utf8view = value;
124        self
125    }
126
127    /// Sets the `enable_options_value_normalization` option.
128    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129        self.enable_options_value_normalization = value;
130        self
131    }
132
133    /// Sets the `collect_spans` option.
134    pub fn with_collect_spans(mut self, value: bool) -> Self {
135        self.collect_spans = value;
136        self
137    }
138
139    /// Sets the `default_null_ordering` option.
140    pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141        self.default_null_ordering = value;
142        self
143    }
144}
145
146impl Default for ParserOptions {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153    fn from(options: &SqlParserOptions) -> Self {
154        Self {
155            parse_float_as_decimal: options.parse_float_as_decimal,
156            enable_ident_normalization: options.enable_ident_normalization,
157            support_varchar_with_length: options.support_varchar_with_length,
158            map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159            enable_options_value_normalization: options
160                .enable_options_value_normalization,
161            collect_spans: options.collect_spans,
162            default_null_ordering: options.default_null_ordering.as_str().into(),
163        }
164    }
165}
166
167/// Represents the null ordering for sorting expressions.
168#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170    /// Nulls appear last in ascending order.
171    NullsMax,
172    /// Nulls appear first in descending order.
173    NullsMin,
174    /// Nulls appear first.
175    NullsFirst,
176    /// Nulls appear last.
177    NullsLast,
178}
179
180impl NullOrdering {
181    /// Evaluates the null ordering based on the given ascending flag.
182    ///
183    /// # Returns
184    /// * `true` if nulls should appear first.
185    /// * `false` if nulls should appear last.
186    pub fn nulls_first(&self, asc: bool) -> bool {
187        match self {
188            Self::NullsMax => !asc,
189            Self::NullsMin => asc,
190            Self::NullsFirst => true,
191            Self::NullsLast => false,
192        }
193    }
194}
195
196impl FromStr for NullOrdering {
197    type Err = DataFusionError;
198
199    fn from_str(s: &str) -> Result<Self> {
200        match s {
201            "nulls_max" => Ok(Self::NullsMax),
202            "nulls_min" => Ok(Self::NullsMin),
203            "nulls_first" => Ok(Self::NullsFirst),
204            "nulls_last" => Ok(Self::NullsLast),
205            _ => plan_err!("Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"),
206        }
207    }
208}
209
210impl From<&str> for NullOrdering {
211    fn from(s: &str) -> Self {
212        Self::from_str(s).unwrap_or(Self::NullsMax)
213    }
214}
215
216/// Ident Normalizer
217#[derive(Debug)]
218pub struct IdentNormalizer {
219    normalize: bool,
220}
221
222impl Default for IdentNormalizer {
223    fn default() -> Self {
224        Self { normalize: true }
225    }
226}
227
228impl IdentNormalizer {
229    pub fn new(normalize: bool) -> Self {
230        Self { normalize }
231    }
232
233    pub fn normalize(&self, ident: Ident) -> String {
234        if self.normalize {
235            crate::utils::normalize_ident(ident)
236        } else {
237            ident.value
238        }
239    }
240}
241
242/// Struct to store the states used by the Planner. The Planner will leverage the states
243/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
244/// Common Table Expression (CTE) provided with WITH clause and
245/// Parameter Data Types provided with PREPARE statement and the query schema of the
246/// outer query plan.
247///
248/// # Cloning
249///
250/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
251/// This helps resolve scoping issues of CTEs.
252/// By using cloning, a subquery can inherit CTEs from the outer query
253/// and can also define its own private CTEs without affecting the outer query.
254///
255#[derive(Debug, Clone)]
256pub struct PlannerContext {
257    /// Data types for numbered parameters ($1, $2, etc), if supplied
258    /// in `PREPARE` statement
259    prepare_param_data_types: Arc<Vec<DataType>>,
260    /// Map of CTE name to logical plan of the WITH clause.
261    /// Use `Arc<LogicalPlan>` to allow cheap cloning
262    ctes: HashMap<String, Arc<LogicalPlan>>,
263    /// The query schema of the outer query plan, used to resolve the columns in subquery
264    outer_query_schema: Option<DFSchemaRef>,
265    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
266    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
267    outer_from_schema: Option<DFSchemaRef>,
268    /// The query schema defined by the table
269    create_table_schema: Option<DFSchemaRef>,
270}
271
272impl Default for PlannerContext {
273    fn default() -> Self {
274        Self::new()
275    }
276}
277
278impl PlannerContext {
279    /// Create an empty PlannerContext
280    pub fn new() -> Self {
281        Self {
282            prepare_param_data_types: Arc::new(vec![]),
283            ctes: HashMap::new(),
284            outer_query_schema: None,
285            outer_from_schema: None,
286            create_table_schema: None,
287        }
288    }
289
290    /// Update the PlannerContext with provided prepare_param_data_types
291    pub fn with_prepare_param_data_types(
292        mut self,
293        prepare_param_data_types: Vec<DataType>,
294    ) -> Self {
295        self.prepare_param_data_types = prepare_param_data_types.into();
296        self
297    }
298
299    // Return a reference to the outer query's schema
300    pub fn outer_query_schema(&self) -> Option<&DFSchema> {
301        self.outer_query_schema.as_ref().map(|s| s.as_ref())
302    }
303
304    /// Sets the outer query schema, returning the existing one, if
305    /// any
306    pub fn set_outer_query_schema(
307        &mut self,
308        mut schema: Option<DFSchemaRef>,
309    ) -> Option<DFSchemaRef> {
310        std::mem::swap(&mut self.outer_query_schema, &mut schema);
311        schema
312    }
313
314    pub fn set_table_schema(
315        &mut self,
316        mut schema: Option<DFSchemaRef>,
317    ) -> Option<DFSchemaRef> {
318        std::mem::swap(&mut self.create_table_schema, &mut schema);
319        schema
320    }
321
322    pub fn table_schema(&self) -> Option<DFSchemaRef> {
323        self.create_table_schema.clone()
324    }
325
326    // Return a clone of the outer FROM schema
327    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
328        self.outer_from_schema.clone()
329    }
330
331    /// Sets the outer FROM schema, returning the existing one, if any
332    pub fn set_outer_from_schema(
333        &mut self,
334        mut schema: Option<DFSchemaRef>,
335    ) -> Option<DFSchemaRef> {
336        std::mem::swap(&mut self.outer_from_schema, &mut schema);
337        schema
338    }
339
340    /// Extends the FROM schema, returning the existing one, if any
341    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
342        match self.outer_from_schema.as_mut() {
343            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
344            None => self.outer_from_schema = Some(Arc::clone(schema)),
345        };
346        Ok(())
347    }
348
349    /// Return the types of parameters (`$1`, `$2`, etc) if known
350    pub fn prepare_param_data_types(&self) -> &[DataType] {
351        &self.prepare_param_data_types
352    }
353
354    /// Returns true if there is a Common Table Expression (CTE) /
355    /// Subquery for the specified name
356    pub fn contains_cte(&self, cte_name: &str) -> bool {
357        self.ctes.contains_key(cte_name)
358    }
359
360    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
361    /// Subquery for the specified name
362    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
363        let cte_name = cte_name.into();
364        self.ctes.insert(cte_name, Arc::new(plan));
365    }
366
367    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
368    /// specified name
369    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
370        self.ctes.get(cte_name).map(|cte| cte.as_ref())
371    }
372
373    /// Remove the plan of CTE / Subquery for the specified name
374    pub(super) fn remove_cte(&mut self, cte_name: &str) {
375        self.ctes.remove(cte_name);
376    }
377}
378
379/// SQL query planner and binder
380///
381/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
382///
383/// You can control the behavior of the planner by providing [`ParserOptions`].
384///
385/// It performs the following tasks:
386///
387/// 1. Name and type resolution (called "binding" in other systems). This
388///    phase looks up table and column names using the [`ContextProvider`].
389/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
390///
391/// It does not perform type coercion, or perform optimization, which are done
392/// by subsequent passes.
393///
394/// Key interfaces are:
395/// * [`Self::sql_statement_to_plan`]: Convert a statement
396///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
397/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
398pub struct SqlToRel<'a, S: ContextProvider> {
399    pub(crate) context_provider: &'a S,
400    pub(crate) options: ParserOptions,
401    pub(crate) ident_normalizer: IdentNormalizer,
402}
403
404impl<'a, S: ContextProvider> SqlToRel<'a, S> {
405    /// Create a new query planner.
406    ///
407    /// The query planner derives the parser options from the context provider.
408    pub fn new(context_provider: &'a S) -> Self {
409        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
410        Self::new_with_options(context_provider, parser_options)
411    }
412
413    /// Create a new query planner with the given parser options.
414    ///
415    /// The query planner ignores the parser options from the context provider
416    /// and uses the given parser options instead.
417    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
418        let ident_normalize = options.enable_ident_normalization;
419
420        SqlToRel {
421            context_provider,
422            options,
423            ident_normalizer: IdentNormalizer::new(ident_normalize),
424        }
425    }
426
427    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
428        let mut fields = Vec::with_capacity(columns.len());
429
430        for column in columns {
431            let data_type = self.convert_data_type(&column.data_type)?;
432            let not_nullable = column
433                .options
434                .iter()
435                .any(|x| x.option == ColumnOption::NotNull);
436            fields.push(Field::new(
437                self.ident_normalizer.normalize(column.name),
438                data_type,
439                !not_nullable,
440            ));
441        }
442
443        Ok(Schema::new(fields))
444    }
445
446    /// Returns a vector of (column_name, default_expr) pairs
447    pub(super) fn build_column_defaults(
448        &self,
449        columns: &Vec<SQLColumnDef>,
450        planner_context: &mut PlannerContext,
451    ) -> Result<Vec<(String, Expr)>> {
452        let mut column_defaults = vec![];
453        // Default expressions are restricted, column references are not allowed
454        let empty_schema = DFSchema::empty();
455        let error_desc = |e: DataFusionError| match e {
456            DataFusionError::SchemaError(ref err, _)
457                if matches!(**err, SchemaError::FieldNotFound { .. }) =>
458            {
459                plan_datafusion_err!(
460                    "Column reference is not allowed in the DEFAULT expression : {}",
461                    e
462                )
463            }
464            _ => e,
465        };
466
467        for column in columns {
468            if let Some(default_sql_expr) =
469                column.options.iter().find_map(|o| match &o.option {
470                    ColumnOption::Default(expr) => Some(expr),
471                    _ => None,
472                })
473            {
474                let default_expr = self
475                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
476                    .map_err(error_desc)?;
477                column_defaults.push((
478                    self.ident_normalizer.normalize(column.name.clone()),
479                    default_expr,
480                ));
481            }
482        }
483        Ok(column_defaults)
484    }
485
486    /// Apply the given TableAlias to the input plan
487    pub(crate) fn apply_table_alias(
488        &self,
489        plan: LogicalPlan,
490        alias: TableAlias,
491    ) -> Result<LogicalPlan> {
492        let idents = alias.columns.into_iter().map(|c| c.name).collect();
493        let plan = self.apply_expr_alias(plan, idents)?;
494
495        LogicalPlanBuilder::from(plan)
496            .alias(TableReference::bare(
497                self.ident_normalizer.normalize(alias.name),
498            ))?
499            .build()
500    }
501
502    pub(crate) fn apply_expr_alias(
503        &self,
504        plan: LogicalPlan,
505        idents: Vec<Ident>,
506    ) -> Result<LogicalPlan> {
507        if idents.is_empty() {
508            Ok(plan)
509        } else if idents.len() != plan.schema().fields().len() {
510            plan_err!(
511                "Source table contains {} columns but only {} \
512                names given as column alias",
513                plan.schema().fields().len(),
514                idents.len()
515            )
516        } else {
517            let fields = plan.schema().fields().clone();
518            LogicalPlanBuilder::from(plan)
519                .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
520                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
521                }))?
522                .build()
523        }
524    }
525
526    /// Validate the schema provides all of the columns referenced in the expressions.
527    pub(crate) fn validate_schema_satisfies_exprs(
528        &self,
529        schema: &DFSchema,
530        exprs: &[Expr],
531    ) -> Result<()> {
532        find_column_exprs(exprs)
533            .iter()
534            .try_for_each(|col| match col {
535                Expr::Column(col) => match &col.relation {
536                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
537                    None => {
538                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
539                            Ok(())
540                        } else {
541                            Err(field_not_found(
542                                col.relation.clone(),
543                                col.name.as_str(),
544                                schema,
545                            ))
546                        }
547                    }
548                }
549                .map_err(|err: DataFusionError| match &err {
550                    DataFusionError::SchemaError(inner, _)
551                        if matches!(
552                            inner.as_ref(),
553                            SchemaError::FieldNotFound { .. }
554                        ) =>
555                    {
556                        let SchemaError::FieldNotFound {
557                            field,
558                            valid_fields,
559                        } = inner.as_ref()
560                        else {
561                            unreachable!()
562                        };
563                        let mut diagnostic = if let Some(relation) = &col.relation {
564                            Diagnostic::new_error(
565                                format!(
566                                    "column '{}' not found in '{}'",
567                                    &col.name, relation
568                                ),
569                                col.spans().first(),
570                            )
571                        } else {
572                            Diagnostic::new_error(
573                                format!("column '{}' not found", &col.name),
574                                col.spans().first(),
575                            )
576                        };
577                        add_possible_columns_to_diag(
578                            &mut diagnostic,
579                            field,
580                            valid_fields,
581                        );
582                        err.with_diagnostic(diagnostic)
583                    }
584                    _ => err,
585                }),
586                _ => internal_err!("Not a column"),
587            })
588    }
589
590    pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
591        // First check if any of the registered type_planner can handle this type
592        if let Some(type_planner) = self.context_provider.get_type_planner() {
593            if let Some(data_type) = type_planner.plan_type(sql_type)? {
594                return Ok(data_type);
595            }
596        }
597
598        // If no type_planner can handle this type, use the default conversion
599        match sql_type {
600            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
601                // Arrays may be multi-dimensional.
602                let inner_data_type = self.convert_data_type(inner_sql_type)?;
603                Ok(DataType::new_list(inner_data_type, true))
604            }
605            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
606                inner_sql_type,
607                maybe_array_size,
608            )) => {
609                let inner_data_type = self.convert_data_type(inner_sql_type)?;
610                if let Some(array_size) = maybe_array_size {
611                    Ok(DataType::new_fixed_size_list(
612                        inner_data_type,
613                        *array_size as i32,
614                        true,
615                    ))
616                } else {
617                    Ok(DataType::new_list(inner_data_type, true))
618                }
619            }
620            SQLDataType::Array(ArrayElemTypeDef::None) => {
621                not_impl_err!("Arrays with unspecified type is not supported")
622            }
623            other => self.convert_simple_data_type(other),
624        }
625    }
626
627    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
628        match sql_type {
629            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
630            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
631            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
632            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
633                Ok(DataType::Int32)
634            }
635            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
636            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
637            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
638                Ok(DataType::UInt16)
639            }
640            SQLDataType::IntUnsigned(_)
641            | SQLDataType::IntegerUnsigned(_)
642            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
643            SQLDataType::Varchar(length) => {
644                match (length, self.options.support_varchar_with_length) {
645                    (Some(_), false) => plan_err!(
646                        "does not support Varchar with length, \
647                    please set `support_varchar_with_length` to be true"
648                    ),
649                    _ => {
650                        if self.options.map_string_types_to_utf8view {
651                            Ok(DataType::Utf8View)
652                        } else {
653                            Ok(DataType::Utf8)
654                        }
655                    }
656                }
657            }
658            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
659                Ok(DataType::UInt64)
660            }
661            SQLDataType::Float(_) => Ok(DataType::Float32),
662            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
663            SQLDataType::Double(ExactNumberInfo::None)
664            | SQLDataType::DoublePrecision
665            | SQLDataType::Float8 => Ok(DataType::Float64),
666            SQLDataType::Double(
667                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
668            ) => {
669                not_impl_err!(
670                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
671                )
672            }
673            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
674                if self.options.map_string_types_to_utf8view {
675                    Ok(DataType::Utf8View)
676                } else {
677                    Ok(DataType::Utf8)
678                }
679            }
680            SQLDataType::Timestamp(precision, tz_info)
681                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
682            {
683                let tz = if matches!(tz_info, TimezoneInfo::Tz)
684                    || matches!(tz_info, TimezoneInfo::WithTimeZone)
685                {
686                    // Timestamp With Time Zone
687                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
688                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
689                    Some(self.context_provider.options().execution.time_zone.clone())
690                } else {
691                    // Timestamp Without Time zone
692                    None
693                };
694                let precision = match precision {
695                    Some(0) => TimeUnit::Second,
696                    Some(3) => TimeUnit::Millisecond,
697                    Some(6) => TimeUnit::Microsecond,
698                    None | Some(9) => TimeUnit::Nanosecond,
699                    _ => unreachable!(),
700                };
701                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
702            }
703            SQLDataType::Date => Ok(DataType::Date32),
704            SQLDataType::Time(None, tz_info) => {
705                if matches!(tz_info, TimezoneInfo::None)
706                    || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
707                {
708                    Ok(DataType::Time64(TimeUnit::Nanosecond))
709                } else {
710                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
711                    not_impl_err!("Unsupported SQL type {sql_type:?}")
712                }
713            }
714            SQLDataType::Numeric(exact_number_info)
715            | SQLDataType::Decimal(exact_number_info) => {
716                let (precision, scale) = match *exact_number_info {
717                    ExactNumberInfo::None => (None, None),
718                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
719                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
720                        (Some(precision), Some(scale))
721                    }
722                };
723                make_decimal_type(precision, scale)
724            }
725            SQLDataType::Bytea => Ok(DataType::Binary),
726            SQLDataType::Interval => Ok(DataType::Interval(IntervalUnit::MonthDayNano)),
727            SQLDataType::Struct(fields, _) => {
728                let fields = fields
729                    .iter()
730                    .enumerate()
731                    .map(|(idx, field)| {
732                        let data_type = self.convert_data_type(&field.field_type)?;
733                        let field_name = match &field.field_name {
734                            Some(ident) => ident.clone(),
735                            None => Ident::new(format!("c{idx}")),
736                        };
737                        Ok(Arc::new(Field::new(
738                            self.ident_normalizer.normalize(field_name),
739                            data_type,
740                            true,
741                        )))
742                    })
743                    .collect::<Result<Vec<_>>>()?;
744                Ok(DataType::Struct(Fields::from(fields)))
745            }
746            SQLDataType::Nvarchar(_)
747            | SQLDataType::JSON
748            | SQLDataType::Uuid
749            | SQLDataType::Binary(_)
750            | SQLDataType::Varbinary(_)
751            | SQLDataType::Blob(_)
752            | SQLDataType::Datetime(_)
753            | SQLDataType::Regclass
754            | SQLDataType::Custom(_, _)
755            | SQLDataType::Array(_)
756            | SQLDataType::Enum(_, _)
757            | SQLDataType::Set(_)
758            | SQLDataType::MediumInt(_)
759            | SQLDataType::MediumIntUnsigned(_)
760            | SQLDataType::Character(_)
761            | SQLDataType::CharacterVarying(_)
762            | SQLDataType::CharVarying(_)
763            | SQLDataType::CharacterLargeObject(_)
764            | SQLDataType::CharLargeObject(_)
765            | SQLDataType::Timestamp(_, _)
766            | SQLDataType::Time(Some(_), _)
767            | SQLDataType::Dec(_)
768            | SQLDataType::BigNumeric(_)
769            | SQLDataType::BigDecimal(_)
770            | SQLDataType::Clob(_)
771            | SQLDataType::Bytes(_)
772            | SQLDataType::Int64
773            | SQLDataType::Float64
774            | SQLDataType::JSONB
775            | SQLDataType::Unspecified
776            | SQLDataType::Int16
777            | SQLDataType::Int32
778            | SQLDataType::Int128
779            | SQLDataType::Int256
780            | SQLDataType::UInt8
781            | SQLDataType::UInt16
782            | SQLDataType::UInt32
783            | SQLDataType::UInt64
784            | SQLDataType::UInt128
785            | SQLDataType::UInt256
786            | SQLDataType::Float32
787            | SQLDataType::Date32
788            | SQLDataType::Datetime64(_, _)
789            | SQLDataType::FixedString(_)
790            | SQLDataType::Map(_, _)
791            | SQLDataType::Tuple(_)
792            | SQLDataType::Nested(_)
793            | SQLDataType::Union(_)
794            | SQLDataType::Nullable(_)
795            | SQLDataType::LowCardinality(_)
796            | SQLDataType::Trigger
797            | SQLDataType::TinyBlob
798            | SQLDataType::MediumBlob
799            | SQLDataType::LongBlob
800            | SQLDataType::TinyText
801            | SQLDataType::MediumText
802            | SQLDataType::LongText
803            | SQLDataType::Bit(_)
804            | SQLDataType::BitVarying(_)
805            | SQLDataType::Signed
806            | SQLDataType::SignedInteger
807            | SQLDataType::Unsigned
808            | SQLDataType::UnsignedInteger
809            | SQLDataType::AnyType
810            | SQLDataType::Table(_)
811            | SQLDataType::VarBit(_)
812            | SQLDataType::UTinyInt
813            | SQLDataType::USmallInt
814            | SQLDataType::HugeInt
815            | SQLDataType::UHugeInt
816            | SQLDataType::UBigInt
817            | SQLDataType::TimestampNtz
818            | SQLDataType::NamedTable { .. }
819            | SQLDataType::TsVector
820            | SQLDataType::TsQuery
821            | SQLDataType::GeometricType(_) => {
822                not_impl_err!("Unsupported SQL type {sql_type:?}")
823            }
824        }
825    }
826
827    pub(crate) fn object_name_to_table_reference(
828        &self,
829        object_name: ObjectName,
830    ) -> Result<TableReference> {
831        object_name_to_table_reference(
832            object_name,
833            self.options.enable_ident_normalization,
834        )
835    }
836}
837
838/// Create a [`TableReference`] after normalizing the specified ObjectName
839///
840/// Examples
841/// ```text
842/// ['foo']          -> Bare { table: "foo" }
843/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
844/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
845/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
846/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
847/// ```
848pub fn object_name_to_table_reference(
849    object_name: ObjectName,
850    enable_normalization: bool,
851) -> Result<TableReference> {
852    // Use destructure to make it clear no fields on ObjectName are ignored
853    let ObjectName(object_name_parts) = object_name;
854    let idents = object_name_parts
855        .into_iter()
856        .map(|object_name_part| {
857            object_name_part.as_ident().cloned().ok_or_else(|| {
858                plan_datafusion_err!(
859                    "Expected identifier, but found: {:?}",
860                    object_name_part
861                )
862            })
863        })
864        .collect::<Result<Vec<_>>>()?;
865    idents_to_table_reference(idents, enable_normalization)
866}
867
868struct IdentTaker {
869    normalizer: IdentNormalizer,
870    idents: Vec<Ident>,
871}
872
873/// Take the next identifier from the back of idents, panic'ing if
874/// there are none left
875impl IdentTaker {
876    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
877        Self {
878            normalizer: IdentNormalizer::new(enable_normalization),
879            idents,
880        }
881    }
882
883    fn take(&mut self) -> String {
884        let ident = self.idents.pop().expect("no more identifiers");
885        self.normalizer.normalize(ident)
886    }
887
888    /// Returns the number of remaining identifiers
889    fn len(&self) -> usize {
890        self.idents.len()
891    }
892}
893
894// impl Display for a nicer error message
895impl std::fmt::Display for IdentTaker {
896    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
897        let mut first = true;
898        for ident in self.idents.iter() {
899            if !first {
900                write!(f, ".")?;
901            }
902            write!(f, "{ident}")?;
903            first = false;
904        }
905
906        Ok(())
907    }
908}
909
910/// Create a [`TableReference`] after normalizing the specified identifier
911pub(crate) fn idents_to_table_reference(
912    idents: Vec<Ident>,
913    enable_normalization: bool,
914) -> Result<TableReference> {
915    let mut taker = IdentTaker::new(idents, enable_normalization);
916
917    match taker.len() {
918        1 => {
919            let table = taker.take();
920            Ok(TableReference::bare(table))
921        }
922        2 => {
923            let table = taker.take();
924            let schema = taker.take();
925            Ok(TableReference::partial(schema, table))
926        }
927        3 => {
928            let table = taker.take();
929            let schema = taker.take();
930            let catalog = taker.take();
931            Ok(TableReference::full(catalog, schema, table))
932        }
933        _ => plan_err!(
934            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
935            taker,
936            taker.len()
937        ),
938    }
939}
940
941/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
942/// from the provided object identifiers (catalog, schema and table names).
943pub fn object_name_to_qualifier(
944    sql_table_name: &ObjectName,
945    enable_normalization: bool,
946) -> Result<String> {
947    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
948    let normalizer = IdentNormalizer::new(enable_normalization);
949    sql_table_name
950        .0
951        .iter()
952        .rev()
953        .zip(columns)
954        .map(|(object_name_part, column_name)| {
955            object_name_part
956                .as_ident()
957                .map(|ident| {
958                    format!(
959                        r#"{} = '{}'"#,
960                        column_name,
961                        normalizer.normalize(ident.clone())
962                    )
963                })
964                .ok_or_else(|| {
965                    plan_datafusion_err!(
966                        "Expected identifier, but found: {:?}",
967                        object_name_part
968                    )
969                })
970        })
971        .collect::<Result<Vec<_>>>()
972        .map(|parts| parts.join(" AND "))
973}