1use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::TableReference;
27use datafusion_common::config::SqlParserOptions;
28use datafusion_common::datatype::{DataTypeExt, FieldExt};
29use datafusion_common::error::add_possible_columns_to_diag;
30use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
31use datafusion_common::{
32 DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
33 plan_datafusion_err,
34};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{Expr, col};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46 pub parse_float_as_decimal: bool,
48 pub enable_ident_normalization: bool,
50 pub support_varchar_with_length: bool,
52 pub enable_options_value_normalization: bool,
54 pub collect_spans: bool,
56 pub map_string_types_to_utf8view: bool,
58 pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63 pub fn new() -> Self {
74 Self {
75 parse_float_as_decimal: false,
76 enable_ident_normalization: true,
77 support_varchar_with_length: true,
78 map_string_types_to_utf8view: true,
79 enable_options_value_normalization: false,
80 collect_spans: false,
81 default_null_ordering: NullOrdering::NullsMax,
84 }
85 }
86
87 pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97 self.parse_float_as_decimal = value;
98 self
99 }
100
101 pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111 self.enable_ident_normalization = value;
112 self
113 }
114
115 pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117 self.support_varchar_with_length = value;
118 self
119 }
120
121 pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123 self.map_string_types_to_utf8view = value;
124 self
125 }
126
127 pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129 self.enable_options_value_normalization = value;
130 self
131 }
132
133 pub fn with_collect_spans(mut self, value: bool) -> Self {
135 self.collect_spans = value;
136 self
137 }
138
139 pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141 self.default_null_ordering = value;
142 self
143 }
144}
145
146impl Default for ParserOptions {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153 fn from(options: &SqlParserOptions) -> Self {
154 Self {
155 parse_float_as_decimal: options.parse_float_as_decimal,
156 enable_ident_normalization: options.enable_ident_normalization,
157 support_varchar_with_length: options.support_varchar_with_length,
158 map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159 enable_options_value_normalization: options
160 .enable_options_value_normalization,
161 collect_spans: options.collect_spans,
162 default_null_ordering: options.default_null_ordering.as_str().into(),
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170 NullsMax,
172 NullsMin,
174 NullsFirst,
176 NullsLast,
178}
179
180impl NullOrdering {
181 pub fn nulls_first(&self, asc: bool) -> bool {
187 match self {
188 Self::NullsMax => !asc,
189 Self::NullsMin => asc,
190 Self::NullsFirst => true,
191 Self::NullsLast => false,
192 }
193 }
194}
195
196impl FromStr for NullOrdering {
197 type Err = DataFusionError;
198
199 fn from_str(s: &str) -> Result<Self> {
200 match s {
201 "nulls_max" => Ok(Self::NullsMax),
202 "nulls_min" => Ok(Self::NullsMin),
203 "nulls_first" => Ok(Self::NullsFirst),
204 "nulls_last" => Ok(Self::NullsLast),
205 _ => plan_err!(
206 "Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"
207 ),
208 }
209 }
210}
211
212impl From<&str> for NullOrdering {
213 fn from(s: &str) -> Self {
214 Self::from_str(s).unwrap_or(Self::NullsMax)
215 }
216}
217
218#[derive(Debug)]
220pub struct IdentNormalizer {
221 normalize: bool,
222}
223
224impl Default for IdentNormalizer {
225 fn default() -> Self {
226 Self { normalize: true }
227 }
228}
229
230impl IdentNormalizer {
231 pub fn new(normalize: bool) -> Self {
232 Self { normalize }
233 }
234
235 pub fn normalize(&self, ident: Ident) -> String {
236 if self.normalize {
237 crate::utils::normalize_ident(ident)
238 } else {
239 ident.value
240 }
241 }
242}
243
244#[derive(Debug, Clone)]
257pub struct PlannerContext {
258 prepare_param_data_types: Arc<Vec<FieldRef>>,
261 ctes: HashMap<String, Arc<LogicalPlan>>,
264
265 outer_queries_schemas_stack: Vec<DFSchemaRef>,
268 outer_from_schema: Option<DFSchemaRef>,
271 create_table_schema: Option<DFSchemaRef>,
273}
274
275impl Default for PlannerContext {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281impl PlannerContext {
282 pub fn new() -> Self {
284 Self {
285 prepare_param_data_types: Arc::new(vec![]),
286 ctes: HashMap::new(),
287 outer_queries_schemas_stack: vec![],
288 outer_from_schema: None,
289 create_table_schema: None,
290 }
291 }
292
293 pub fn with_prepare_param_data_types(
295 mut self,
296 prepare_param_data_types: Vec<FieldRef>,
297 ) -> Self {
298 self.prepare_param_data_types = prepare_param_data_types.into();
299 self
300 }
301
302 pub fn outer_queries_schemas(&self) -> &[DFSchemaRef] {
305 &self.outer_queries_schemas_stack
306 }
307
308 pub fn outer_schemas_iter(&self) -> impl Iterator<Item = &DFSchemaRef> {
321 self.outer_queries_schemas_stack.iter().rev()
322 }
323
324 pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
327 self.outer_queries_schemas_stack.push(schema);
328 }
329
330 pub fn latest_outer_query_schema(&self) -> Option<&DFSchemaRef> {
332 self.outer_queries_schemas_stack.last()
333 }
334
335 pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
337 self.outer_queries_schemas_stack.pop()
338 }
339
340 pub fn set_table_schema(
341 &mut self,
342 mut schema: Option<DFSchemaRef>,
343 ) -> Option<DFSchemaRef> {
344 std::mem::swap(&mut self.create_table_schema, &mut schema);
345 schema
346 }
347
348 pub fn table_schema(&self) -> Option<DFSchemaRef> {
349 self.create_table_schema.clone()
350 }
351
352 pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
354 self.outer_from_schema.clone()
355 }
356
357 pub fn set_outer_from_schema(
359 &mut self,
360 mut schema: Option<DFSchemaRef>,
361 ) -> Option<DFSchemaRef> {
362 std::mem::swap(&mut self.outer_from_schema, &mut schema);
363 schema
364 }
365
366 pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
368 match self.outer_from_schema.as_mut() {
369 Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
370 None => self.outer_from_schema = Some(Arc::clone(schema)),
371 };
372 Ok(())
373 }
374
375 pub fn prepare_param_data_types(&self) -> &[FieldRef] {
377 &self.prepare_param_data_types
378 }
379
380 pub fn contains_cte(&self, cte_name: &str) -> bool {
383 self.ctes.contains_key(cte_name)
384 }
385
386 pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
389 let cte_name = cte_name.into();
390 self.ctes.insert(cte_name, Arc::new(plan));
391 }
392
393 pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
396 self.ctes.get(cte_name).map(|cte| cte.as_ref())
397 }
398
399 pub(super) fn remove_cte(&mut self, cte_name: &str) {
401 self.ctes.remove(cte_name);
402 }
403}
404
405pub struct SqlToRel<'a, S: ContextProvider> {
425 pub(crate) context_provider: &'a S,
426 pub(crate) options: ParserOptions,
427 pub(crate) ident_normalizer: IdentNormalizer,
428}
429
430impl<'a, S: ContextProvider> SqlToRel<'a, S> {
431 pub fn new(context_provider: &'a S) -> Self {
435 let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
436 Self::new_with_options(context_provider, parser_options)
437 }
438
439 pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
444 let ident_normalize = options.enable_ident_normalization;
445
446 SqlToRel {
447 context_provider,
448 options,
449 ident_normalizer: IdentNormalizer::new(ident_normalize),
450 }
451 }
452
453 pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
454 let mut fields = Vec::with_capacity(columns.len());
455
456 for column in columns {
457 let data_type = self.convert_data_type_to_field(&column.data_type)?;
458 let not_nullable = column
459 .options
460 .iter()
461 .any(|x| x.option == ColumnOption::NotNull);
462 fields.push(
463 data_type
464 .as_ref()
465 .clone()
466 .with_name(self.ident_normalizer.normalize(column.name))
467 .with_nullable(!not_nullable),
468 );
469 }
470
471 Ok(Schema::new(fields))
472 }
473
474 pub(super) fn build_column_defaults(
476 &self,
477 columns: &Vec<SQLColumnDef>,
478 planner_context: &mut PlannerContext,
479 ) -> Result<Vec<(String, Expr)>> {
480 let mut column_defaults = vec![];
481 let empty_schema = DFSchema::empty();
483 let error_desc = |e: DataFusionError| match e {
484 DataFusionError::SchemaError(ref err, _)
485 if matches!(**err, SchemaError::FieldNotFound { .. }) =>
486 {
487 plan_datafusion_err!(
488 "Column reference is not allowed in the DEFAULT expression : {}",
489 e
490 )
491 }
492 _ => e,
493 };
494
495 for column in columns {
496 if let Some(default_sql_expr) =
497 column.options.iter().find_map(|o| match &o.option {
498 ColumnOption::Default(expr) => Some(expr),
499 _ => None,
500 })
501 {
502 let default_expr = self
503 .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
504 .map_err(error_desc)?;
505 column_defaults.push((
506 self.ident_normalizer.normalize(column.name.clone()),
507 default_expr,
508 ));
509 }
510 }
511 Ok(column_defaults)
512 }
513
514 pub(crate) fn apply_table_alias(
516 &self,
517 plan: LogicalPlan,
518 alias: TableAlias,
519 ) -> Result<LogicalPlan> {
520 let idents = alias.columns.into_iter().map(|c| c.name).collect();
521 let plan = self.apply_expr_alias(plan, idents)?;
522
523 LogicalPlanBuilder::from(plan)
524 .alias(TableReference::bare(
525 self.ident_normalizer.normalize(alias.name),
526 ))?
527 .build()
528 }
529
530 pub(crate) fn apply_expr_alias(
531 &self,
532 plan: LogicalPlan,
533 idents: Vec<Ident>,
534 ) -> Result<LogicalPlan> {
535 if idents.is_empty() {
536 Ok(plan)
537 } else if idents.len() != plan.schema().fields().len() {
538 plan_err!(
539 "Source table contains {} columns but only {} \
540 names given as column alias",
541 plan.schema().fields().len(),
542 idents.len()
543 )
544 } else {
545 let fields = plan.schema().fields().clone();
546 LogicalPlanBuilder::from(plan)
547 .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
548 col(field.name()).alias(self.ident_normalizer.normalize(ident))
549 }))?
550 .build()
551 }
552 }
553
554 pub(crate) fn validate_schema_satisfies_exprs(
556 &self,
557 schema: &DFSchema,
558 exprs: &[Expr],
559 ) -> Result<()> {
560 find_column_exprs(exprs)
561 .iter()
562 .try_for_each(|col| match col {
563 Expr::Column(col) => match &col.relation {
564 Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
565 None => {
566 if !schema.fields_with_unqualified_name(&col.name).is_empty() {
567 Ok(())
568 } else {
569 Err(field_not_found(
570 col.relation.clone(),
571 col.name.as_str(),
572 schema,
573 ))
574 }
575 }
576 }
577 .map_err(|err: DataFusionError| match &err {
578 DataFusionError::SchemaError(inner, _)
579 if matches!(
580 inner.as_ref(),
581 SchemaError::FieldNotFound { .. }
582 ) =>
583 {
584 let SchemaError::FieldNotFound {
585 field,
586 valid_fields,
587 } = inner.as_ref()
588 else {
589 unreachable!()
590 };
591 let mut diagnostic = if let Some(relation) = &col.relation {
592 Diagnostic::new_error(
593 format!(
594 "column '{}' not found in '{}'",
595 &col.name, relation
596 ),
597 col.spans().first(),
598 )
599 } else {
600 Diagnostic::new_error(
601 format!("column '{}' not found", &col.name),
602 col.spans().first(),
603 )
604 };
605 add_possible_columns_to_diag(
606 &mut diagnostic,
607 field,
608 valid_fields,
609 );
610 err.with_diagnostic(diagnostic)
611 }
612 _ => err,
613 }),
614 _ => internal_err!("Not a column"),
615 })
616 }
617
618 pub(crate) fn convert_data_type_to_field(
619 &self,
620 sql_type: &SQLDataType,
621 ) -> Result<FieldRef> {
622 if let Some(type_planner) = self.context_provider.get_type_planner()
624 && let Some(data_type) = type_planner.plan_type(sql_type)?
625 {
626 return Ok(data_type.into_nullable_field_ref());
627 }
628
629 match sql_type {
631 SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
632 Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
634 }
635 SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
636 inner_sql_type,
637 maybe_array_size,
638 )) => {
639 let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
640 if let Some(array_size) = maybe_array_size {
641 let array_size: i32 = (*array_size).try_into().map_err(|_| {
642 plan_datafusion_err!(
643 "Array size must be a positive 32 bit integer, got {array_size}"
644 )
645 })?;
646 Ok(inner_field.into_fixed_size_list(array_size))
647 } else {
648 Ok(inner_field.into_list())
649 }
650 }
651 SQLDataType::Array(ArrayElemTypeDef::None) => {
652 not_impl_err!("Arrays with unspecified type is not supported")
653 }
654 other => Ok(self
655 .convert_simple_data_type(other)?
656 .into_nullable_field_ref()),
657 }
658 }
659
660 fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
661 match sql_type {
662 SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
663 SQLDataType::TinyInt(_) => Ok(DataType::Int8),
664 SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
665 SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
666 Ok(DataType::Int32)
667 }
668 SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
669 SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
670 SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
671 Ok(DataType::UInt16)
672 }
673 SQLDataType::IntUnsigned(_)
674 | SQLDataType::IntegerUnsigned(_)
675 | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
676 SQLDataType::Varchar(length) => {
677 match (length, self.options.support_varchar_with_length) {
678 (Some(_), false) => plan_err!(
679 "does not support Varchar with length, \
680 please set `support_varchar_with_length` to be true"
681 ),
682 _ => {
683 if self.options.map_string_types_to_utf8view {
684 Ok(DataType::Utf8View)
685 } else {
686 Ok(DataType::Utf8)
687 }
688 }
689 }
690 }
691 SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
692 Ok(DataType::UInt64)
693 }
694 SQLDataType::Float(_) => Ok(DataType::Float32),
695 SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
696 SQLDataType::Double(ExactNumberInfo::None)
697 | SQLDataType::DoublePrecision
698 | SQLDataType::Float8 => Ok(DataType::Float64),
699 SQLDataType::Double(
700 ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
701 ) => {
702 not_impl_err!(
703 "Unsupported SQL type (precision/scale not supported) {sql_type}"
704 )
705 }
706 SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
707 if self.options.map_string_types_to_utf8view {
708 Ok(DataType::Utf8View)
709 } else {
710 Ok(DataType::Utf8)
711 }
712 }
713 SQLDataType::Timestamp(precision, tz_info)
714 if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
715 {
716 let tz = if *tz_info == TimezoneInfo::Tz
717 || *tz_info == TimezoneInfo::WithTimeZone
718 {
719 self.context_provider.options().execution.time_zone.clone()
723 } else {
724 None
726 };
727 let precision = match precision {
728 Some(0) => TimeUnit::Second,
729 Some(3) => TimeUnit::Millisecond,
730 Some(6) => TimeUnit::Microsecond,
731 None | Some(9) => TimeUnit::Nanosecond,
732 _ => unreachable!(),
733 };
734 Ok(DataType::Timestamp(precision, tz.map(Into::into)))
735 }
736 SQLDataType::Date => Ok(DataType::Date32),
737 SQLDataType::Time(None, tz_info) => {
738 if *tz_info == TimezoneInfo::None
739 || *tz_info == TimezoneInfo::WithoutTimeZone
740 {
741 Ok(DataType::Time64(TimeUnit::Nanosecond))
742 } else {
743 not_impl_err!("Unsupported SQL type {sql_type}")
745 }
746 }
747 SQLDataType::Numeric(exact_number_info)
748 | SQLDataType::Decimal(exact_number_info) => {
749 let (precision, scale) = match *exact_number_info {
750 ExactNumberInfo::None => (None, None),
751 ExactNumberInfo::Precision(precision) => (Some(precision), None),
752 ExactNumberInfo::PrecisionAndScale(precision, scale) => {
753 (Some(precision), Some(scale))
754 }
755 };
756 make_decimal_type(precision, scale.map(|s| s as u64))
757 }
758 SQLDataType::Bytea => Ok(DataType::Binary),
759 SQLDataType::Interval { fields, precision } => {
760 if fields.is_some() || precision.is_some() {
761 return not_impl_err!("Unsupported SQL type {sql_type}");
762 }
763 Ok(DataType::Interval(IntervalUnit::MonthDayNano))
764 }
765 SQLDataType::Struct(fields, _) => {
766 let fields = fields
767 .iter()
768 .enumerate()
769 .map(|(idx, sql_struct_field)| {
770 let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
771 let field_name = match &sql_struct_field.field_name {
772 Some(ident) => ident.clone(),
773 None => Ident::new(format!("c{idx}")),
774 };
775 Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
776 })
777 .collect::<Result<Vec<_>>>()?;
778 Ok(DataType::Struct(Fields::from(fields)))
779 }
780 SQLDataType::Nvarchar(_)
781 | SQLDataType::JSON
782 | SQLDataType::Uuid
783 | SQLDataType::Binary(_)
784 | SQLDataType::Varbinary(_)
785 | SQLDataType::Blob(_)
786 | SQLDataType::Datetime(_)
787 | SQLDataType::Regclass
788 | SQLDataType::Custom(_, _)
789 | SQLDataType::Array(_)
790 | SQLDataType::Enum(_, _)
791 | SQLDataType::Set(_)
792 | SQLDataType::MediumInt(_)
793 | SQLDataType::MediumIntUnsigned(_)
794 | SQLDataType::Character(_)
795 | SQLDataType::CharacterVarying(_)
796 | SQLDataType::CharVarying(_)
797 | SQLDataType::CharacterLargeObject(_)
798 | SQLDataType::CharLargeObject(_)
799 | SQLDataType::Timestamp(_, _)
800 | SQLDataType::Time(Some(_), _)
801 | SQLDataType::Dec(_)
802 | SQLDataType::BigNumeric(_)
803 | SQLDataType::BigDecimal(_)
804 | SQLDataType::Clob(_)
805 | SQLDataType::Bytes(_)
806 | SQLDataType::Int64
807 | SQLDataType::Float64
808 | SQLDataType::JSONB
809 | SQLDataType::Unspecified
810 | SQLDataType::Int16
811 | SQLDataType::Int32
812 | SQLDataType::Int128
813 | SQLDataType::Int256
814 | SQLDataType::UInt8
815 | SQLDataType::UInt16
816 | SQLDataType::UInt32
817 | SQLDataType::UInt64
818 | SQLDataType::UInt128
819 | SQLDataType::UInt256
820 | SQLDataType::Float32
821 | SQLDataType::Date32
822 | SQLDataType::Datetime64(_, _)
823 | SQLDataType::FixedString(_)
824 | SQLDataType::Map(_, _)
825 | SQLDataType::Tuple(_)
826 | SQLDataType::Nested(_)
827 | SQLDataType::Union(_)
828 | SQLDataType::Nullable(_)
829 | SQLDataType::LowCardinality(_)
830 | SQLDataType::Trigger
831 | SQLDataType::TinyBlob
832 | SQLDataType::MediumBlob
833 | SQLDataType::LongBlob
834 | SQLDataType::TinyText
835 | SQLDataType::MediumText
836 | SQLDataType::LongText
837 | SQLDataType::Bit(_)
838 | SQLDataType::BitVarying(_)
839 | SQLDataType::Signed
840 | SQLDataType::SignedInteger
841 | SQLDataType::Unsigned
842 | SQLDataType::UnsignedInteger
843 | SQLDataType::AnyType
844 | SQLDataType::Table(_)
845 | SQLDataType::VarBit(_)
846 | SQLDataType::UTinyInt
847 | SQLDataType::USmallInt
848 | SQLDataType::HugeInt
849 | SQLDataType::UHugeInt
850 | SQLDataType::UBigInt
851 | SQLDataType::TimestampNtz{..}
852 | SQLDataType::NamedTable { .. }
853 | SQLDataType::TsVector
854 | SQLDataType::TsQuery
855 | SQLDataType::GeometricType(_)
856 | SQLDataType::DecimalUnsigned(_) | SQLDataType::FloatUnsigned(_) | SQLDataType::RealUnsigned | SQLDataType::DecUnsigned(_) | SQLDataType::DoubleUnsigned(_) | SQLDataType::DoublePrecisionUnsigned => {
863 not_impl_err!("Unsupported SQL type {sql_type}")
864 }
865 }
866 }
867
868 pub(crate) fn object_name_to_table_reference(
869 &self,
870 object_name: ObjectName,
871 ) -> Result<TableReference> {
872 object_name_to_table_reference(
873 object_name,
874 self.options.enable_ident_normalization,
875 )
876 }
877}
878
879pub fn object_name_to_table_reference(
890 object_name: ObjectName,
891 enable_normalization: bool,
892) -> Result<TableReference> {
893 let ObjectName(object_name_parts) = object_name;
895 let idents = object_name_parts
896 .into_iter()
897 .map(|object_name_part| {
898 object_name_part.as_ident().cloned().ok_or_else(|| {
899 plan_datafusion_err!(
900 "Expected identifier, but found: {:?}",
901 object_name_part
902 )
903 })
904 })
905 .collect::<Result<Vec<_>>>()?;
906 idents_to_table_reference(idents, enable_normalization)
907}
908
909struct IdentTaker {
910 normalizer: IdentNormalizer,
911 idents: Vec<Ident>,
912}
913
914impl IdentTaker {
917 fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
918 Self {
919 normalizer: IdentNormalizer::new(enable_normalization),
920 idents,
921 }
922 }
923
924 fn take(&mut self) -> String {
925 let ident = self.idents.pop().expect("no more identifiers");
926 self.normalizer.normalize(ident)
927 }
928
929 fn len(&self) -> usize {
931 self.idents.len()
932 }
933}
934
935impl std::fmt::Display for IdentTaker {
937 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
938 let mut first = true;
939 for ident in self.idents.iter() {
940 if !first {
941 write!(f, ".")?;
942 }
943 write!(f, "{ident}")?;
944 first = false;
945 }
946
947 Ok(())
948 }
949}
950
951pub(crate) fn idents_to_table_reference(
953 idents: Vec<Ident>,
954 enable_normalization: bool,
955) -> Result<TableReference> {
956 let mut taker = IdentTaker::new(idents, enable_normalization);
957
958 match taker.len() {
959 1 => {
960 let table = taker.take();
961 Ok(TableReference::bare(table))
962 }
963 2 => {
964 let table = taker.take();
965 let schema = taker.take();
966 Ok(TableReference::partial(schema, table))
967 }
968 3 => {
969 let table = taker.take();
970 let schema = taker.take();
971 let catalog = taker.take();
972 Ok(TableReference::full(catalog, schema, table))
973 }
974 _ => plan_err!(
975 "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
976 taker,
977 taker.len()
978 ),
979 }
980}
981
982pub fn object_name_to_qualifier(
985 sql_table_name: &ObjectName,
986 enable_normalization: bool,
987) -> Result<String> {
988 let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
989 let normalizer = IdentNormalizer::new(enable_normalization);
990 sql_table_name
991 .0
992 .iter()
993 .rev()
994 .zip(columns)
995 .map(|(object_name_part, column_name)| {
996 object_name_part
997 .as_ident()
998 .map(|ident| {
999 format!(
1000 r#"{} = '{}'"#,
1001 column_name,
1002 normalizer.normalize(ident.clone())
1003 )
1004 })
1005 .ok_or_else(|| {
1006 plan_datafusion_err!(
1007 "Expected identifier, but found: {:?}",
1008 object_name_part
1009 )
1010 })
1011 })
1012 .collect::<Result<Vec<_>>>()
1013 .map(|parts| parts.join(" AND "))
1014}