datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::vec;
22
23use arrow::datatypes::*;
24use datafusion_common::config::SqlParserOptions;
25use datafusion_common::error::add_possible_columns_to_diag;
26use datafusion_common::TableReference;
27use datafusion_common::{
28    field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic,
29    SchemaError,
30};
31use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
32use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
33use datafusion_expr::utils::find_column_exprs;
34use datafusion_expr::{col, Expr};
35use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
36use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
37use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
38
39use crate::utils::make_decimal_type;
40pub use datafusion_expr::planner::ContextProvider;
41
42/// SQL parser options
43#[derive(Debug, Clone, Copy)]
44pub struct ParserOptions {
45    /// Whether to parse float as decimal.
46    pub parse_float_as_decimal: bool,
47    /// Whether to normalize identifiers.
48    pub enable_ident_normalization: bool,
49    /// Whether to support varchar with length.
50    pub support_varchar_with_length: bool,
51    /// Whether to normalize options value.
52    pub enable_options_value_normalization: bool,
53    /// Whether to collect spans
54    pub collect_spans: bool,
55    /// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
56    pub map_string_types_to_utf8view: bool,
57}
58
59impl ParserOptions {
60    /// Creates a new `ParserOptions` instance with default values.
61    ///
62    /// # Examples
63    ///
64    /// ```
65    /// use datafusion_sql::planner::ParserOptions;
66    /// let opts = ParserOptions::new();
67    /// assert_eq!(opts.parse_float_as_decimal, false);
68    /// assert_eq!(opts.enable_ident_normalization, true);
69    /// ```
70    pub fn new() -> Self {
71        Self {
72            parse_float_as_decimal: false,
73            enable_ident_normalization: true,
74            support_varchar_with_length: true,
75            map_string_types_to_utf8view: true,
76            enable_options_value_normalization: false,
77            collect_spans: false,
78        }
79    }
80
81    /// Sets the `parse_float_as_decimal` option.
82    ///
83    /// # Examples
84    ///
85    /// ```
86    /// use datafusion_sql::planner::ParserOptions;
87    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
88    /// assert_eq!(opts.parse_float_as_decimal, true);
89    /// ```
90    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
91        self.parse_float_as_decimal = value;
92        self
93    }
94
95    /// Sets the `enable_ident_normalization` option.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// use datafusion_sql::planner::ParserOptions;
101    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
102    /// assert_eq!(opts.enable_ident_normalization, false);
103    /// ```
104    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
105        self.enable_ident_normalization = value;
106        self
107    }
108
109    /// Sets the `support_varchar_with_length` option.
110    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
111        self.support_varchar_with_length = value;
112        self
113    }
114
115    /// Sets the `map_string_types_to_utf8view` option.
116    pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
117        self.map_string_types_to_utf8view = value;
118        self
119    }
120
121    /// Sets the `enable_options_value_normalization` option.
122    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
123        self.enable_options_value_normalization = value;
124        self
125    }
126
127    /// Sets the `collect_spans` option.
128    pub fn with_collect_spans(mut self, value: bool) -> Self {
129        self.collect_spans = value;
130        self
131    }
132}
133
134impl Default for ParserOptions {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140impl From<&SqlParserOptions> for ParserOptions {
141    fn from(options: &SqlParserOptions) -> Self {
142        Self {
143            parse_float_as_decimal: options.parse_float_as_decimal,
144            enable_ident_normalization: options.enable_ident_normalization,
145            support_varchar_with_length: options.support_varchar_with_length,
146            map_string_types_to_utf8view: options.map_string_types_to_utf8view,
147            enable_options_value_normalization: options
148                .enable_options_value_normalization,
149            collect_spans: options.collect_spans,
150        }
151    }
152}
153
154/// Ident Normalizer
155#[derive(Debug)]
156pub struct IdentNormalizer {
157    normalize: bool,
158}
159
160impl Default for IdentNormalizer {
161    fn default() -> Self {
162        Self { normalize: true }
163    }
164}
165
166impl IdentNormalizer {
167    pub fn new(normalize: bool) -> Self {
168        Self { normalize }
169    }
170
171    pub fn normalize(&self, ident: Ident) -> String {
172        if self.normalize {
173            crate::utils::normalize_ident(ident)
174        } else {
175            ident.value
176        }
177    }
178}
179
180/// Struct to store the states used by the Planner. The Planner will leverage the states
181/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
182/// Common Table Expression (CTE) provided with WITH clause and
183/// Parameter Data Types provided with PREPARE statement and the query schema of the
184/// outer query plan.
185///
186/// # Cloning
187///
188/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
189/// This helps resolve scoping issues of CTEs.
190/// By using cloning, a subquery can inherit CTEs from the outer query
191/// and can also define its own private CTEs without affecting the outer query.
192///
193#[derive(Debug, Clone)]
194pub struct PlannerContext {
195    /// Data types for numbered parameters ($1, $2, etc), if supplied
196    /// in `PREPARE` statement
197    prepare_param_data_types: Arc<Vec<DataType>>,
198    /// Map of CTE name to logical plan of the WITH clause.
199    /// Use `Arc<LogicalPlan>` to allow cheap cloning
200    ctes: HashMap<String, Arc<LogicalPlan>>,
201    /// The query schema of the outer query plan, used to resolve the columns in subquery
202    outer_query_schema: Option<DFSchemaRef>,
203    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
204    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
205    outer_from_schema: Option<DFSchemaRef>,
206    /// The query schema defined by the table
207    create_table_schema: Option<DFSchemaRef>,
208}
209
210impl Default for PlannerContext {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216impl PlannerContext {
217    /// Create an empty PlannerContext
218    pub fn new() -> Self {
219        Self {
220            prepare_param_data_types: Arc::new(vec![]),
221            ctes: HashMap::new(),
222            outer_query_schema: None,
223            outer_from_schema: None,
224            create_table_schema: None,
225        }
226    }
227
228    /// Update the PlannerContext with provided prepare_param_data_types
229    pub fn with_prepare_param_data_types(
230        mut self,
231        prepare_param_data_types: Vec<DataType>,
232    ) -> Self {
233        self.prepare_param_data_types = prepare_param_data_types.into();
234        self
235    }
236
237    // Return a reference to the outer query's schema
238    pub fn outer_query_schema(&self) -> Option<&DFSchema> {
239        self.outer_query_schema.as_ref().map(|s| s.as_ref())
240    }
241
242    /// Sets the outer query schema, returning the existing one, if
243    /// any
244    pub fn set_outer_query_schema(
245        &mut self,
246        mut schema: Option<DFSchemaRef>,
247    ) -> Option<DFSchemaRef> {
248        std::mem::swap(&mut self.outer_query_schema, &mut schema);
249        schema
250    }
251
252    pub fn set_table_schema(
253        &mut self,
254        mut schema: Option<DFSchemaRef>,
255    ) -> Option<DFSchemaRef> {
256        std::mem::swap(&mut self.create_table_schema, &mut schema);
257        schema
258    }
259
260    pub fn table_schema(&self) -> Option<DFSchemaRef> {
261        self.create_table_schema.clone()
262    }
263
264    // Return a clone of the outer FROM schema
265    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
266        self.outer_from_schema.clone()
267    }
268
269    /// Sets the outer FROM schema, returning the existing one, if any
270    pub fn set_outer_from_schema(
271        &mut self,
272        mut schema: Option<DFSchemaRef>,
273    ) -> Option<DFSchemaRef> {
274        std::mem::swap(&mut self.outer_from_schema, &mut schema);
275        schema
276    }
277
278    /// Extends the FROM schema, returning the existing one, if any
279    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
280        match self.outer_from_schema.as_mut() {
281            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
282            None => self.outer_from_schema = Some(Arc::clone(schema)),
283        };
284        Ok(())
285    }
286
287    /// Return the types of parameters (`$1`, `$2`, etc) if known
288    pub fn prepare_param_data_types(&self) -> &[DataType] {
289        &self.prepare_param_data_types
290    }
291
292    /// Returns true if there is a Common Table Expression (CTE) /
293    /// Subquery for the specified name
294    pub fn contains_cte(&self, cte_name: &str) -> bool {
295        self.ctes.contains_key(cte_name)
296    }
297
298    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
299    /// Subquery for the specified name
300    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
301        let cte_name = cte_name.into();
302        self.ctes.insert(cte_name, Arc::new(plan));
303    }
304
305    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
306    /// specified name
307    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
308        self.ctes.get(cte_name).map(|cte| cte.as_ref())
309    }
310
311    /// Remove the plan of CTE / Subquery for the specified name
312    pub(super) fn remove_cte(&mut self, cte_name: &str) {
313        self.ctes.remove(cte_name);
314    }
315}
316
317/// SQL query planner and binder
318///
319/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
320///
321/// You can control the behavior of the planner by providing [`ParserOptions`].
322///
323/// It performs the following tasks:
324///
325/// 1. Name and type resolution (called "binding" in other systems). This
326///    phase looks up table and column names using the [`ContextProvider`].
327/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
328///
329/// It does not perform type coercion, or perform optimization, which are done
330/// by subsequent passes.
331///
332/// Key interfaces are:
333/// * [`Self::sql_statement_to_plan`]: Convert a statement
334///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
335/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
336pub struct SqlToRel<'a, S: ContextProvider> {
337    pub(crate) context_provider: &'a S,
338    pub(crate) options: ParserOptions,
339    pub(crate) ident_normalizer: IdentNormalizer,
340}
341
342impl<'a, S: ContextProvider> SqlToRel<'a, S> {
343    /// Create a new query planner.
344    ///
345    /// The query planner derives the parser options from the context provider.
346    pub fn new(context_provider: &'a S) -> Self {
347        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
348        Self::new_with_options(context_provider, parser_options)
349    }
350
351    /// Create a new query planner with the given parser options.
352    ///
353    /// The query planner ignores the parser options from the context provider
354    /// and uses the given parser options instead.
355    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
356        let ident_normalize = options.enable_ident_normalization;
357
358        SqlToRel {
359            context_provider,
360            options,
361            ident_normalizer: IdentNormalizer::new(ident_normalize),
362        }
363    }
364
365    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
366        let mut fields = Vec::with_capacity(columns.len());
367
368        for column in columns {
369            let data_type = self.convert_data_type(&column.data_type)?;
370            let not_nullable = column
371                .options
372                .iter()
373                .any(|x| x.option == ColumnOption::NotNull);
374            fields.push(Field::new(
375                self.ident_normalizer.normalize(column.name),
376                data_type,
377                !not_nullable,
378            ));
379        }
380
381        Ok(Schema::new(fields))
382    }
383
384    /// Returns a vector of (column_name, default_expr) pairs
385    pub(super) fn build_column_defaults(
386        &self,
387        columns: &Vec<SQLColumnDef>,
388        planner_context: &mut PlannerContext,
389    ) -> Result<Vec<(String, Expr)>> {
390        let mut column_defaults = vec![];
391        // Default expressions are restricted, column references are not allowed
392        let empty_schema = DFSchema::empty();
393        let error_desc = |e: DataFusionError| match e {
394            DataFusionError::SchemaError(ref err, _)
395                if matches!(**err, SchemaError::FieldNotFound { .. }) =>
396            {
397                plan_datafusion_err!(
398                    "Column reference is not allowed in the DEFAULT expression : {}",
399                    e
400                )
401            }
402            _ => e,
403        };
404
405        for column in columns {
406            if let Some(default_sql_expr) =
407                column.options.iter().find_map(|o| match &o.option {
408                    ColumnOption::Default(expr) => Some(expr),
409                    _ => None,
410                })
411            {
412                let default_expr = self
413                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
414                    .map_err(error_desc)?;
415                column_defaults.push((
416                    self.ident_normalizer.normalize(column.name.clone()),
417                    default_expr,
418                ));
419            }
420        }
421        Ok(column_defaults)
422    }
423
424    /// Apply the given TableAlias to the input plan
425    pub(crate) fn apply_table_alias(
426        &self,
427        plan: LogicalPlan,
428        alias: TableAlias,
429    ) -> Result<LogicalPlan> {
430        let idents = alias.columns.into_iter().map(|c| c.name).collect();
431        let plan = self.apply_expr_alias(plan, idents)?;
432
433        LogicalPlanBuilder::from(plan)
434            .alias(TableReference::bare(
435                self.ident_normalizer.normalize(alias.name),
436            ))?
437            .build()
438    }
439
440    pub(crate) fn apply_expr_alias(
441        &self,
442        plan: LogicalPlan,
443        idents: Vec<Ident>,
444    ) -> Result<LogicalPlan> {
445        if idents.is_empty() {
446            Ok(plan)
447        } else if idents.len() != plan.schema().fields().len() {
448            plan_err!(
449                "Source table contains {} columns but only {} \
450                names given as column alias",
451                plan.schema().fields().len(),
452                idents.len()
453            )
454        } else {
455            let fields = plan.schema().fields().clone();
456            LogicalPlanBuilder::from(plan)
457                .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
458                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
459                }))?
460                .build()
461        }
462    }
463
464    /// Validate the schema provides all of the columns referenced in the expressions.
465    pub(crate) fn validate_schema_satisfies_exprs(
466        &self,
467        schema: &DFSchema,
468        exprs: &[Expr],
469    ) -> Result<()> {
470        find_column_exprs(exprs)
471            .iter()
472            .try_for_each(|col| match col {
473                Expr::Column(col) => match &col.relation {
474                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
475                    None => {
476                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
477                            Ok(())
478                        } else {
479                            Err(field_not_found(
480                                col.relation.clone(),
481                                col.name.as_str(),
482                                schema,
483                            ))
484                        }
485                    }
486                }
487                .map_err(|err: DataFusionError| match &err {
488                    DataFusionError::SchemaError(inner, _)
489                        if matches!(
490                            inner.as_ref(),
491                            SchemaError::FieldNotFound { .. }
492                        ) =>
493                    {
494                        let SchemaError::FieldNotFound {
495                            field,
496                            valid_fields,
497                        } = inner.as_ref()
498                        else {
499                            unreachable!()
500                        };
501                        let mut diagnostic = if let Some(relation) = &col.relation {
502                            Diagnostic::new_error(
503                                format!(
504                                    "column '{}' not found in '{}'",
505                                    &col.name, relation
506                                ),
507                                col.spans().first(),
508                            )
509                        } else {
510                            Diagnostic::new_error(
511                                format!("column '{}' not found", &col.name),
512                                col.spans().first(),
513                            )
514                        };
515                        add_possible_columns_to_diag(
516                            &mut diagnostic,
517                            field,
518                            valid_fields,
519                        );
520                        err.with_diagnostic(diagnostic)
521                    }
522                    _ => err,
523                }),
524                _ => internal_err!("Not a column"),
525            })
526    }
527
528    pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
529        // First check if any of the registered type_planner can handle this type
530        if let Some(type_planner) = self.context_provider.get_type_planner() {
531            if let Some(data_type) = type_planner.plan_type(sql_type)? {
532                return Ok(data_type);
533            }
534        }
535
536        // If no type_planner can handle this type, use the default conversion
537        match sql_type {
538            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
539                // Arrays may be multi-dimensional.
540                let inner_data_type = self.convert_data_type(inner_sql_type)?;
541                Ok(DataType::new_list(inner_data_type, true))
542            }
543            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
544                inner_sql_type,
545                maybe_array_size,
546            )) => {
547                let inner_data_type = self.convert_data_type(inner_sql_type)?;
548                if let Some(array_size) = maybe_array_size {
549                    Ok(DataType::new_fixed_size_list(
550                        inner_data_type,
551                        *array_size as i32,
552                        true,
553                    ))
554                } else {
555                    Ok(DataType::new_list(inner_data_type, true))
556                }
557            }
558            SQLDataType::Array(ArrayElemTypeDef::None) => {
559                not_impl_err!("Arrays with unspecified type is not supported")
560            }
561            other => self.convert_simple_data_type(other),
562        }
563    }
564
565    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
566        match sql_type {
567            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
568            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
569            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
570            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
571                Ok(DataType::Int32)
572            }
573            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
574            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
575            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
576                Ok(DataType::UInt16)
577            }
578            SQLDataType::IntUnsigned(_)
579            | SQLDataType::IntegerUnsigned(_)
580            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
581            SQLDataType::Varchar(length) => {
582                match (length, self.options.support_varchar_with_length) {
583                    (Some(_), false) => plan_err!(
584                        "does not support Varchar with length, \
585                    please set `support_varchar_with_length` to be true"
586                    ),
587                    _ => {
588                        if self.options.map_string_types_to_utf8view {
589                            Ok(DataType::Utf8View)
590                        } else {
591                            Ok(DataType::Utf8)
592                        }
593                    }
594                }
595            }
596            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
597                Ok(DataType::UInt64)
598            }
599            SQLDataType::Float(_) => Ok(DataType::Float32),
600            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
601            SQLDataType::Double(ExactNumberInfo::None)
602            | SQLDataType::DoublePrecision
603            | SQLDataType::Float8 => Ok(DataType::Float64),
604            SQLDataType::Double(
605                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
606            ) => {
607                not_impl_err!(
608                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
609                )
610            }
611            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
612                if self.options.map_string_types_to_utf8view {
613                    Ok(DataType::Utf8View)
614                } else {
615                    Ok(DataType::Utf8)
616                }
617            }
618            SQLDataType::Timestamp(precision, tz_info)
619                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
620            {
621                let tz = if matches!(tz_info, TimezoneInfo::Tz)
622                    || matches!(tz_info, TimezoneInfo::WithTimeZone)
623                {
624                    // Timestamp With Time Zone
625                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
626                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
627                    Some(self.context_provider.options().execution.time_zone.clone())
628                } else {
629                    // Timestamp Without Time zone
630                    None
631                };
632                let precision = match precision {
633                    Some(0) => TimeUnit::Second,
634                    Some(3) => TimeUnit::Millisecond,
635                    Some(6) => TimeUnit::Microsecond,
636                    None | Some(9) => TimeUnit::Nanosecond,
637                    _ => unreachable!(),
638                };
639                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
640            }
641            SQLDataType::Date => Ok(DataType::Date32),
642            SQLDataType::Time(None, tz_info) => {
643                if matches!(tz_info, TimezoneInfo::None)
644                    || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
645                {
646                    Ok(DataType::Time64(TimeUnit::Nanosecond))
647                } else {
648                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
649                    not_impl_err!("Unsupported SQL type {sql_type:?}")
650                }
651            }
652            SQLDataType::Numeric(exact_number_info)
653            | SQLDataType::Decimal(exact_number_info) => {
654                let (precision, scale) = match *exact_number_info {
655                    ExactNumberInfo::None => (None, None),
656                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
657                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
658                        (Some(precision), Some(scale))
659                    }
660                };
661                make_decimal_type(precision, scale)
662            }
663            SQLDataType::Bytea => Ok(DataType::Binary),
664            SQLDataType::Interval => Ok(DataType::Interval(IntervalUnit::MonthDayNano)),
665            SQLDataType::Struct(fields, _) => {
666                let fields = fields
667                    .iter()
668                    .enumerate()
669                    .map(|(idx, field)| {
670                        let data_type = self.convert_data_type(&field.field_type)?;
671                        let field_name = match &field.field_name {
672                            Some(ident) => ident.clone(),
673                            None => Ident::new(format!("c{idx}")),
674                        };
675                        Ok(Arc::new(Field::new(
676                            self.ident_normalizer.normalize(field_name),
677                            data_type,
678                            true,
679                        )))
680                    })
681                    .collect::<Result<Vec<_>>>()?;
682                Ok(DataType::Struct(Fields::from(fields)))
683            }
684            SQLDataType::Nvarchar(_)
685            | SQLDataType::JSON
686            | SQLDataType::Uuid
687            | SQLDataType::Binary(_)
688            | SQLDataType::Varbinary(_)
689            | SQLDataType::Blob(_)
690            | SQLDataType::Datetime(_)
691            | SQLDataType::Regclass
692            | SQLDataType::Custom(_, _)
693            | SQLDataType::Array(_)
694            | SQLDataType::Enum(_, _)
695            | SQLDataType::Set(_)
696            | SQLDataType::MediumInt(_)
697            | SQLDataType::MediumIntUnsigned(_)
698            | SQLDataType::Character(_)
699            | SQLDataType::CharacterVarying(_)
700            | SQLDataType::CharVarying(_)
701            | SQLDataType::CharacterLargeObject(_)
702            | SQLDataType::CharLargeObject(_)
703            | SQLDataType::Timestamp(_, _)
704            | SQLDataType::Time(Some(_), _)
705            | SQLDataType::Dec(_)
706            | SQLDataType::BigNumeric(_)
707            | SQLDataType::BigDecimal(_)
708            | SQLDataType::Clob(_)
709            | SQLDataType::Bytes(_)
710            | SQLDataType::Int64
711            | SQLDataType::Float64
712            | SQLDataType::JSONB
713            | SQLDataType::Unspecified
714            | SQLDataType::Int16
715            | SQLDataType::Int32
716            | SQLDataType::Int128
717            | SQLDataType::Int256
718            | SQLDataType::UInt8
719            | SQLDataType::UInt16
720            | SQLDataType::UInt32
721            | SQLDataType::UInt64
722            | SQLDataType::UInt128
723            | SQLDataType::UInt256
724            | SQLDataType::Float32
725            | SQLDataType::Date32
726            | SQLDataType::Datetime64(_, _)
727            | SQLDataType::FixedString(_)
728            | SQLDataType::Map(_, _)
729            | SQLDataType::Tuple(_)
730            | SQLDataType::Nested(_)
731            | SQLDataType::Union(_)
732            | SQLDataType::Nullable(_)
733            | SQLDataType::LowCardinality(_)
734            | SQLDataType::Trigger
735            | SQLDataType::TinyBlob
736            | SQLDataType::MediumBlob
737            | SQLDataType::LongBlob
738            | SQLDataType::TinyText
739            | SQLDataType::MediumText
740            | SQLDataType::LongText
741            | SQLDataType::Bit(_)
742            | SQLDataType::BitVarying(_)
743            | SQLDataType::Signed
744            | SQLDataType::SignedInteger
745            | SQLDataType::Unsigned
746            | SQLDataType::UnsignedInteger
747            | SQLDataType::AnyType
748            | SQLDataType::Table(_)
749            | SQLDataType::VarBit(_)
750            | SQLDataType::GeometricType(_) => {
751                not_impl_err!("Unsupported SQL type {sql_type:?}")
752            }
753        }
754    }
755
756    pub(crate) fn object_name_to_table_reference(
757        &self,
758        object_name: ObjectName,
759    ) -> Result<TableReference> {
760        object_name_to_table_reference(
761            object_name,
762            self.options.enable_ident_normalization,
763        )
764    }
765}
766
767/// Create a [`TableReference`] after normalizing the specified ObjectName
768///
769/// Examples
770/// ```text
771/// ['foo']          -> Bare { table: "foo" }
772/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
773/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
774/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
775/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
776/// ```
777pub fn object_name_to_table_reference(
778    object_name: ObjectName,
779    enable_normalization: bool,
780) -> Result<TableReference> {
781    // Use destructure to make it clear no fields on ObjectName are ignored
782    let ObjectName(object_name_parts) = object_name;
783    let idents = object_name_parts
784        .into_iter()
785        .map(|object_name_part| {
786            object_name_part.as_ident().cloned().ok_or_else(|| {
787                plan_datafusion_err!(
788                    "Expected identifier, but found: {:?}",
789                    object_name_part
790                )
791            })
792        })
793        .collect::<Result<Vec<_>>>()?;
794    idents_to_table_reference(idents, enable_normalization)
795}
796
797struct IdentTaker {
798    normalizer: IdentNormalizer,
799    idents: Vec<Ident>,
800}
801
802/// Take the next identifier from the back of idents, panic'ing if
803/// there are none left
804impl IdentTaker {
805    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
806        Self {
807            normalizer: IdentNormalizer::new(enable_normalization),
808            idents,
809        }
810    }
811
812    fn take(&mut self) -> String {
813        let ident = self.idents.pop().expect("no more identifiers");
814        self.normalizer.normalize(ident)
815    }
816
817    /// Returns the number of remaining identifiers
818    fn len(&self) -> usize {
819        self.idents.len()
820    }
821}
822
823// impl Display for a nicer error message
824impl std::fmt::Display for IdentTaker {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        let mut first = true;
827        for ident in self.idents.iter() {
828            if !first {
829                write!(f, ".")?;
830            }
831            write!(f, "{ident}")?;
832            first = false;
833        }
834
835        Ok(())
836    }
837}
838
839/// Create a [`TableReference`] after normalizing the specified identifier
840pub(crate) fn idents_to_table_reference(
841    idents: Vec<Ident>,
842    enable_normalization: bool,
843) -> Result<TableReference> {
844    let mut taker = IdentTaker::new(idents, enable_normalization);
845
846    match taker.len() {
847        1 => {
848            let table = taker.take();
849            Ok(TableReference::bare(table))
850        }
851        2 => {
852            let table = taker.take();
853            let schema = taker.take();
854            Ok(TableReference::partial(schema, table))
855        }
856        3 => {
857            let table = taker.take();
858            let schema = taker.take();
859            let catalog = taker.take();
860            Ok(TableReference::full(catalog, schema, table))
861        }
862        _ => plan_err!(
863            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
864            taker,
865            taker.len()
866        ),
867    }
868}
869
870/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
871/// from the provided object identifiers (catalog, schema and table names).
872pub fn object_name_to_qualifier(
873    sql_table_name: &ObjectName,
874    enable_normalization: bool,
875) -> Result<String> {
876    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
877    let normalizer = IdentNormalizer::new(enable_normalization);
878    sql_table_name
879        .0
880        .iter()
881        .rev()
882        .zip(columns)
883        .map(|(object_name_part, column_name)| {
884            object_name_part
885                .as_ident()
886                .map(|ident| {
887                    format!(
888                        r#"{} = '{}'"#,
889                        column_name,
890                        normalizer.normalize(ident.clone())
891                    )
892                })
893                .ok_or_else(|| {
894                    plan_datafusion_err!(
895                        "Expected identifier, but found: {:?}",
896                        object_name_part
897                    )
898                })
899        })
900        .collect::<Result<Vec<_>>>()
901        .map(|parts| parts.join(" AND "))
902}