1use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::TableReference;
27use datafusion_common::config::SqlParserOptions;
28use datafusion_common::datatype::{DataTypeExt, FieldExt};
29use datafusion_common::error::add_possible_columns_to_diag;
30use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
31use datafusion_common::{
32 DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
33 plan_datafusion_err,
34};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{Expr, col};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46 pub parse_float_as_decimal: bool,
48 pub enable_ident_normalization: bool,
50 pub support_varchar_with_length: bool,
52 pub enable_options_value_normalization: bool,
54 pub collect_spans: bool,
56 pub map_string_types_to_utf8view: bool,
58 pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63 pub fn new() -> Self {
74 Self {
75 parse_float_as_decimal: false,
76 enable_ident_normalization: true,
77 support_varchar_with_length: true,
78 map_string_types_to_utf8view: true,
79 enable_options_value_normalization: false,
80 collect_spans: false,
81 default_null_ordering: NullOrdering::NullsMax,
84 }
85 }
86
87 pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97 self.parse_float_as_decimal = value;
98 self
99 }
100
101 pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111 self.enable_ident_normalization = value;
112 self
113 }
114
115 pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117 self.support_varchar_with_length = value;
118 self
119 }
120
121 pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123 self.map_string_types_to_utf8view = value;
124 self
125 }
126
127 pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129 self.enable_options_value_normalization = value;
130 self
131 }
132
133 pub fn with_collect_spans(mut self, value: bool) -> Self {
135 self.collect_spans = value;
136 self
137 }
138
139 pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141 self.default_null_ordering = value;
142 self
143 }
144}
145
146impl Default for ParserOptions {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153 fn from(options: &SqlParserOptions) -> Self {
154 Self {
155 parse_float_as_decimal: options.parse_float_as_decimal,
156 enable_ident_normalization: options.enable_ident_normalization,
157 support_varchar_with_length: options.support_varchar_with_length,
158 map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159 enable_options_value_normalization: options
160 .enable_options_value_normalization,
161 collect_spans: options.collect_spans,
162 default_null_ordering: options.default_null_ordering.as_str().into(),
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170 NullsMax,
172 NullsMin,
174 NullsFirst,
176 NullsLast,
178}
179
180impl NullOrdering {
181 pub fn nulls_first(&self, asc: bool) -> bool {
187 match self {
188 Self::NullsMax => !asc,
189 Self::NullsMin => asc,
190 Self::NullsFirst => true,
191 Self::NullsLast => false,
192 }
193 }
194}
195
196impl FromStr for NullOrdering {
197 type Err = DataFusionError;
198
199 fn from_str(s: &str) -> Result<Self> {
200 match s {
201 "nulls_max" => Ok(Self::NullsMax),
202 "nulls_min" => Ok(Self::NullsMin),
203 "nulls_first" => Ok(Self::NullsFirst),
204 "nulls_last" => Ok(Self::NullsLast),
205 _ => plan_err!(
206 "Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"
207 ),
208 }
209 }
210}
211
212impl From<&str> for NullOrdering {
213 fn from(s: &str) -> Self {
214 Self::from_str(s).unwrap_or(Self::NullsMax)
215 }
216}
217
218#[derive(Debug)]
220pub struct IdentNormalizer {
221 normalize: bool,
222}
223
224impl Default for IdentNormalizer {
225 fn default() -> Self {
226 Self { normalize: true }
227 }
228}
229
230impl IdentNormalizer {
231 pub fn new(normalize: bool) -> Self {
232 Self { normalize }
233 }
234
235 pub fn normalize(&self, ident: Ident) -> String {
236 if self.normalize {
237 crate::utils::normalize_ident(ident)
238 } else {
239 ident.value
240 }
241 }
242}
243
244#[derive(Debug, Clone)]
257pub struct PlannerContext {
258 prepare_param_data_types: Arc<Vec<Option<FieldRef>>>,
261 ctes: HashMap<String, Arc<LogicalPlan>>,
264
265 outer_queries_schemas_stack: Vec<DFSchemaRef>,
268 outer_from_schema: Option<DFSchemaRef>,
271 create_table_schema: Option<DFSchemaRef>,
273 set_expr_left_schema: Option<DFSchemaRef>,
277 lambda_parameters: HashMap<String, FieldRef>,
279}
280
281impl Default for PlannerContext {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287impl PlannerContext {
288 pub fn new() -> Self {
290 Self {
291 prepare_param_data_types: Arc::new(vec![]),
292 ctes: HashMap::new(),
293 outer_queries_schemas_stack: vec![],
294 outer_from_schema: None,
295 create_table_schema: None,
296 set_expr_left_schema: None,
297 lambda_parameters: HashMap::new(),
298 }
299 }
300
301 pub fn with_prepare_param_data_types(
303 mut self,
304 prepare_param_data_types: Vec<Option<FieldRef>>,
305 ) -> Self {
306 self.prepare_param_data_types = prepare_param_data_types.into();
307 self
308 }
309
310 pub fn outer_queries_schemas(&self) -> &[DFSchemaRef] {
313 &self.outer_queries_schemas_stack
314 }
315
316 pub fn outer_schemas_iter(&self) -> impl Iterator<Item = &DFSchemaRef> {
329 self.outer_queries_schemas_stack.iter().rev()
330 }
331
332 pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
335 self.outer_queries_schemas_stack.push(schema);
336 }
337
338 pub fn latest_outer_query_schema(&self) -> Option<&DFSchemaRef> {
340 self.outer_queries_schemas_stack.last()
341 }
342
343 pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
345 self.outer_queries_schemas_stack.pop()
346 }
347
348 pub fn set_table_schema(
349 &mut self,
350 mut schema: Option<DFSchemaRef>,
351 ) -> Option<DFSchemaRef> {
352 std::mem::swap(&mut self.create_table_schema, &mut schema);
353 schema
354 }
355
356 pub fn table_schema(&self) -> Option<DFSchemaRef> {
357 self.create_table_schema.clone()
358 }
359
360 pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
362 self.outer_from_schema.clone()
363 }
364
365 pub fn set_outer_from_schema(
367 &mut self,
368 mut schema: Option<DFSchemaRef>,
369 ) -> Option<DFSchemaRef> {
370 std::mem::swap(&mut self.outer_from_schema, &mut schema);
371 schema
372 }
373
374 pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
376 match self.outer_from_schema.as_mut() {
377 Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
378 None => self.outer_from_schema = Some(Arc::clone(schema)),
379 };
380 Ok(())
381 }
382
383 pub fn prepare_param_data_types(&self) -> &[Option<FieldRef>] {
385 &self.prepare_param_data_types
386 }
387
388 pub fn contains_cte(&self, cte_name: &str) -> bool {
391 self.ctes.contains_key(cte_name)
392 }
393
394 pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
397 let cte_name = cte_name.into();
398 self.ctes.insert(cte_name, Arc::new(plan));
399 }
400
401 pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
404 self.ctes.get(cte_name).map(|cte| cte.as_ref())
405 }
406
407 pub fn lambda_parameters(&self) -> &HashMap<String, FieldRef> {
408 &self.lambda_parameters
409 }
410
411 pub fn with_lambda_parameters(
412 mut self,
413 parameters: impl IntoIterator<Item = FieldRef>,
414 ) -> Self {
415 self.lambda_parameters
416 .extend(parameters.into_iter().map(|f| (f.name().clone(), f)));
417
418 self
419 }
420
421 pub(super) fn remove_cte(&mut self, cte_name: &str) {
423 self.ctes.remove(cte_name);
424 }
425
426 pub(super) fn set_set_expr_left_schema(
428 &mut self,
429 schema: Option<DFSchemaRef>,
430 ) -> Option<DFSchemaRef> {
431 std::mem::replace(&mut self.set_expr_left_schema, schema)
432 }
433}
434
435pub struct SqlToRel<'a, S: ContextProvider> {
455 pub(crate) context_provider: &'a S,
456 pub(crate) options: ParserOptions,
457 pub(crate) ident_normalizer: IdentNormalizer,
458}
459
460impl<'a, S: ContextProvider> SqlToRel<'a, S> {
461 pub fn new(context_provider: &'a S) -> Self {
465 let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
466 Self::new_with_options(context_provider, parser_options)
467 }
468
469 pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
474 let ident_normalize = options.enable_ident_normalization;
475
476 SqlToRel {
477 context_provider,
478 options,
479 ident_normalizer: IdentNormalizer::new(ident_normalize),
480 }
481 }
482
483 pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
484 let mut fields = Vec::with_capacity(columns.len());
485
486 for column in columns {
487 let data_type = self.convert_data_type_to_field(&column.data_type)?;
488 let not_nullable = column
489 .options
490 .iter()
491 .any(|x| x.option == ColumnOption::NotNull);
492 fields.push(
493 data_type
494 .as_ref()
495 .clone()
496 .with_name(self.ident_normalizer.normalize(column.name))
497 .with_nullable(!not_nullable),
498 );
499 }
500
501 Ok(Schema::new(fields))
502 }
503
504 pub(super) fn build_column_defaults(
506 &self,
507 columns: &Vec<SQLColumnDef>,
508 planner_context: &mut PlannerContext,
509 ) -> Result<Vec<(String, Expr)>> {
510 let mut column_defaults = vec![];
511 let empty_schema = DFSchema::empty();
513 let error_desc = |e: DataFusionError| match e {
514 DataFusionError::SchemaError(ref err, _)
515 if matches!(**err, SchemaError::FieldNotFound { .. }) =>
516 {
517 plan_datafusion_err!(
518 "Column reference is not allowed in the DEFAULT expression : {}",
519 e
520 )
521 }
522 _ => e,
523 };
524
525 for column in columns {
526 if let Some(default_sql_expr) =
527 column.options.iter().find_map(|o| match &o.option {
528 ColumnOption::Default(expr) => Some(expr),
529 _ => None,
530 })
531 {
532 let default_expr = self
533 .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
534 .map_err(error_desc)?;
535 column_defaults.push((
536 self.ident_normalizer.normalize(column.name.clone()),
537 default_expr,
538 ));
539 }
540 }
541 Ok(column_defaults)
542 }
543
544 pub(crate) fn apply_table_alias(
546 &self,
547 plan: LogicalPlan,
548 alias: TableAlias,
549 ) -> Result<LogicalPlan> {
550 let idents = alias.columns.into_iter().map(|c| c.name).collect();
551 let plan = self.apply_expr_alias(plan, idents)?;
552
553 LogicalPlanBuilder::from(plan)
554 .alias(TableReference::bare(
555 self.ident_normalizer.normalize(alias.name),
556 ))?
557 .build()
558 }
559
560 pub(crate) fn apply_expr_alias(
561 &self,
562 plan: LogicalPlan,
563 idents: Vec<Ident>,
564 ) -> Result<LogicalPlan> {
565 if idents.is_empty() {
566 Ok(plan)
567 } else if idents.len() != plan.schema().fields().len() {
568 plan_err!(
569 "Source table contains {} columns but only {} \
570 names given as column alias",
571 plan.schema().fields().len(),
572 idents.len()
573 )
574 } else {
575 let fields = plan.schema().fields().clone();
576 LogicalPlanBuilder::from(plan)
577 .project(fields.iter().zip(idents).map(|(field, ident)| {
578 col(field.name()).alias(self.ident_normalizer.normalize(ident))
579 }))?
580 .build()
581 }
582 }
583
584 pub(crate) fn validate_schema_satisfies_exprs(
586 &self,
587 schema: &DFSchema,
588 exprs: &[Expr],
589 ) -> Result<()> {
590 find_column_exprs(exprs)
591 .iter()
592 .try_for_each(|col| match col {
593 Expr::Column(col) => match &col.relation {
594 Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
595 None => {
596 if !schema.fields_with_unqualified_name(&col.name).is_empty() {
597 Ok(())
598 } else {
599 Err(field_not_found(
600 col.relation.clone(),
601 col.name.as_str(),
602 schema,
603 ))
604 }
605 }
606 }
607 .map_err(|err: DataFusionError| match &err {
608 DataFusionError::SchemaError(inner, _)
609 if matches!(
610 inner.as_ref(),
611 SchemaError::FieldNotFound { .. }
612 ) =>
613 {
614 let SchemaError::FieldNotFound {
615 field,
616 valid_fields,
617 } = inner.as_ref()
618 else {
619 unreachable!()
620 };
621 let mut diagnostic = if let Some(relation) = &col.relation {
622 Diagnostic::new_error(
623 format!(
624 "column '{}' not found in '{}'",
625 &col.name, relation
626 ),
627 col.spans().first(),
628 )
629 } else {
630 Diagnostic::new_error(
631 format!("column '{}' not found", &col.name),
632 col.spans().first(),
633 )
634 };
635 add_possible_columns_to_diag(
636 &mut diagnostic,
637 field,
638 valid_fields,
639 );
640 err.with_diagnostic(diagnostic)
641 }
642 _ => err,
643 }),
644 _ => internal_err!("Not a column"),
645 })
646 }
647
648 pub(crate) fn convert_data_type_to_field(
649 &self,
650 sql_type: &SQLDataType,
651 ) -> Result<FieldRef> {
652 if let Some(type_planner) = self.context_provider.get_type_planner()
654 && let Some(data_type) = type_planner.plan_type_field(sql_type)?
655 {
656 return Ok(data_type);
657 }
658
659 match sql_type {
661 SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
662 Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
664 }
665 SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
666 inner_sql_type,
667 maybe_array_size,
668 )) => {
669 let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
670 if let Some(array_size) = maybe_array_size {
671 let array_size: i32 = (*array_size).try_into().map_err(|_| {
672 plan_datafusion_err!(
673 "Array size must be a positive 32 bit integer, got {array_size}"
674 )
675 })?;
676 Ok(inner_field.into_fixed_size_list(array_size))
677 } else {
678 Ok(inner_field.into_list())
679 }
680 }
681 SQLDataType::Array(ArrayElemTypeDef::None) => {
682 not_impl_err!("Arrays with unspecified type is not supported")
683 }
684 other => Ok(self
685 .convert_simple_data_type(other)?
686 .into_nullable_field_ref()),
687 }
688 }
689
690 fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
691 match sql_type {
692 SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
693 SQLDataType::TinyInt(_) => Ok(DataType::Int8),
694 SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
695 SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
696 Ok(DataType::Int32)
697 }
698 SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
699 SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
700 SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
701 Ok(DataType::UInt16)
702 }
703 SQLDataType::IntUnsigned(_)
704 | SQLDataType::IntegerUnsigned(_)
705 | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
706 SQLDataType::Varchar(length) => {
707 match (length, self.options.support_varchar_with_length) {
708 (Some(_), false) => plan_err!(
709 "does not support Varchar with length, \
710 please set `support_varchar_with_length` to be true"
711 ),
712 _ => {
713 if self.options.map_string_types_to_utf8view {
714 Ok(DataType::Utf8View)
715 } else {
716 Ok(DataType::Utf8)
717 }
718 }
719 }
720 }
721 SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
722 Ok(DataType::UInt64)
723 }
724 SQLDataType::Float(_) => Ok(DataType::Float32),
725 SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
726 SQLDataType::Double(ExactNumberInfo::None)
727 | SQLDataType::DoublePrecision
728 | SQLDataType::Float8 => Ok(DataType::Float64),
729 SQLDataType::Double(
730 ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
731 ) => {
732 not_impl_err!(
733 "Unsupported SQL type (precision/scale not supported) {sql_type}"
734 )
735 }
736 SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
737 if self.options.map_string_types_to_utf8view {
738 Ok(DataType::Utf8View)
739 } else {
740 Ok(DataType::Utf8)
741 }
742 }
743 SQLDataType::Timestamp(precision, tz_info)
744 if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
745 {
746 let tz = if *tz_info == TimezoneInfo::Tz
747 || *tz_info == TimezoneInfo::WithTimeZone
748 {
749 self.context_provider.options().execution.time_zone.clone()
753 } else {
754 None
756 };
757 let precision = match precision {
758 Some(0) => TimeUnit::Second,
759 Some(3) => TimeUnit::Millisecond,
760 Some(6) => TimeUnit::Microsecond,
761 None | Some(9) => TimeUnit::Nanosecond,
762 _ => unreachable!(),
763 };
764 Ok(DataType::Timestamp(precision, tz.map(Into::into)))
765 }
766 SQLDataType::Date => Ok(DataType::Date32),
767 SQLDataType::Time(None, tz_info) => {
768 if *tz_info == TimezoneInfo::None
769 || *tz_info == TimezoneInfo::WithoutTimeZone
770 {
771 Ok(DataType::Time64(TimeUnit::Nanosecond))
772 } else {
773 not_impl_err!("Unsupported SQL type {sql_type}")
775 }
776 }
777 SQLDataType::Numeric(exact_number_info)
778 | SQLDataType::Decimal(exact_number_info) => {
779 let (precision, scale) = match *exact_number_info {
780 ExactNumberInfo::None => (None, None),
781 ExactNumberInfo::Precision(precision) => (Some(precision), None),
782 ExactNumberInfo::PrecisionAndScale(precision, scale) => {
783 (Some(precision), Some(scale))
784 }
785 };
786 make_decimal_type(precision, scale.map(|s| s as u64))
787 }
788 SQLDataType::Bytea => Ok(DataType::Binary),
789 SQLDataType::Interval { fields, precision } => {
790 if fields.is_some() || precision.is_some() {
791 return not_impl_err!("Unsupported SQL type {sql_type}");
792 }
793 Ok(DataType::Interval(IntervalUnit::MonthDayNano))
794 }
795 SQLDataType::Struct(fields, _) => {
796 let fields = fields
797 .iter()
798 .enumerate()
799 .map(|(idx, sql_struct_field)| {
800 let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
801 let field_name = match &sql_struct_field.field_name {
802 Some(ident) => ident.clone(),
803 None => Ident::new(format!("c{idx}")),
804 };
805 Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
806 })
807 .collect::<Result<Vec<_>>>()?;
808 Ok(DataType::Struct(Fields::from(fields)))
809 }
810 SQLDataType::Nvarchar(_)
811 | SQLDataType::JSON
812 | SQLDataType::Uuid
813 | SQLDataType::Binary(_)
814 | SQLDataType::Varbinary(_)
815 | SQLDataType::Blob(_)
816 | SQLDataType::Datetime(_)
817 | SQLDataType::Regclass
818 | SQLDataType::Custom(_, _)
819 | SQLDataType::Array(_)
820 | SQLDataType::Enum(_, _)
821 | SQLDataType::Set(_)
822 | SQLDataType::MediumInt(_)
823 | SQLDataType::MediumIntUnsigned(_)
824 | SQLDataType::Character(_)
825 | SQLDataType::CharacterVarying(_)
826 | SQLDataType::CharVarying(_)
827 | SQLDataType::CharacterLargeObject(_)
828 | SQLDataType::CharLargeObject(_)
829 | SQLDataType::Timestamp(_, _)
830 | SQLDataType::Time(Some(_), _)
831 | SQLDataType::Dec(_)
832 | SQLDataType::BigNumeric(_)
833 | SQLDataType::BigDecimal(_)
834 | SQLDataType::Clob(_)
835 | SQLDataType::Bytes(_)
836 | SQLDataType::Int64
837 | SQLDataType::Float64
838 | SQLDataType::JSONB
839 | SQLDataType::Unspecified
840 | SQLDataType::Int16
841 | SQLDataType::Int32
842 | SQLDataType::Int128
843 | SQLDataType::Int256
844 | SQLDataType::UInt8
845 | SQLDataType::UInt16
846 | SQLDataType::UInt32
847 | SQLDataType::UInt64
848 | SQLDataType::UInt128
849 | SQLDataType::UInt256
850 | SQLDataType::Float32
851 | SQLDataType::Date32
852 | SQLDataType::Datetime64(_, _)
853 | SQLDataType::FixedString(_)
854 | SQLDataType::Map(_, _)
855 | SQLDataType::Tuple(_)
856 | SQLDataType::Nested(_)
857 | SQLDataType::Union(_)
858 | SQLDataType::Nullable(_)
859 | SQLDataType::LowCardinality(_)
860 | SQLDataType::Trigger
861 | SQLDataType::TinyBlob
862 | SQLDataType::MediumBlob
863 | SQLDataType::LongBlob
864 | SQLDataType::TinyText
865 | SQLDataType::MediumText
866 | SQLDataType::LongText
867 | SQLDataType::Bit(_)
868 | SQLDataType::BitVarying(_)
869 | SQLDataType::Signed
870 | SQLDataType::SignedInteger
871 | SQLDataType::Unsigned
872 | SQLDataType::UnsignedInteger
873 | SQLDataType::AnyType
874 | SQLDataType::Table(_)
875 | SQLDataType::VarBit(_)
876 | SQLDataType::UTinyInt
877 | SQLDataType::USmallInt
878 | SQLDataType::HugeInt
879 | SQLDataType::UHugeInt
880 | SQLDataType::UBigInt
881 | SQLDataType::TimestampNtz{..}
882 | SQLDataType::NamedTable { .. }
883 | SQLDataType::TsVector
884 | SQLDataType::TsQuery
885 | SQLDataType::GeometricType(_)
886 | SQLDataType::DecimalUnsigned(_) | SQLDataType::FloatUnsigned(_) | SQLDataType::RealUnsigned | SQLDataType::DecUnsigned(_) | SQLDataType::DoubleUnsigned(_) | SQLDataType::DoublePrecisionUnsigned => {
893 not_impl_err!("Unsupported SQL type {sql_type}")
894 }
895 }
896 }
897
898 pub(crate) fn object_name_to_table_reference(
899 &self,
900 object_name: ObjectName,
901 ) -> Result<TableReference> {
902 object_name_to_table_reference(
903 object_name,
904 self.options.enable_ident_normalization,
905 )
906 }
907}
908
909pub fn object_name_to_table_reference(
920 object_name: ObjectName,
921 enable_normalization: bool,
922) -> Result<TableReference> {
923 let ObjectName(object_name_parts) = object_name;
925 let idents = object_name_parts
926 .into_iter()
927 .map(|object_name_part| {
928 object_name_part.as_ident().cloned().ok_or_else(|| {
929 plan_datafusion_err!(
930 "Expected identifier, but found: {:?}",
931 object_name_part
932 )
933 })
934 })
935 .collect::<Result<Vec<_>>>()?;
936 idents_to_table_reference(idents, enable_normalization)
937}
938
939struct IdentTaker {
940 normalizer: IdentNormalizer,
941 idents: Vec<Ident>,
942}
943
944impl IdentTaker {
947 fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
948 Self {
949 normalizer: IdentNormalizer::new(enable_normalization),
950 idents,
951 }
952 }
953
954 fn take(&mut self) -> String {
955 let ident = self.idents.pop().expect("no more identifiers");
956 self.normalizer.normalize(ident)
957 }
958
959 fn len(&self) -> usize {
961 self.idents.len()
962 }
963}
964
965impl std::fmt::Display for IdentTaker {
967 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
968 let mut first = true;
969 for ident in self.idents.iter() {
970 if !first {
971 write!(f, ".")?;
972 }
973 write!(f, "{ident}")?;
974 first = false;
975 }
976
977 Ok(())
978 }
979}
980
981pub(crate) fn idents_to_table_reference(
983 idents: Vec<Ident>,
984 enable_normalization: bool,
985) -> Result<TableReference> {
986 let mut taker = IdentTaker::new(idents, enable_normalization);
987
988 match taker.len() {
989 1 => {
990 let table = taker.take();
991 Ok(TableReference::bare(table))
992 }
993 2 => {
994 let table = taker.take();
995 let schema = taker.take();
996 Ok(TableReference::partial(schema, table))
997 }
998 3 => {
999 let table = taker.take();
1000 let schema = taker.take();
1001 let catalog = taker.take();
1002 Ok(TableReference::full(catalog, schema, table))
1003 }
1004 _ => plan_err!(
1005 "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
1006 taker,
1007 taker.len()
1008 ),
1009 }
1010}
1011
1012pub fn object_name_to_qualifier(
1015 sql_table_name: &ObjectName,
1016 enable_normalization: bool,
1017) -> Result<String> {
1018 let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
1019 let normalizer = IdentNormalizer::new(enable_normalization);
1020 sql_table_name
1021 .0
1022 .iter()
1023 .rev()
1024 .zip(columns)
1025 .map(|(object_name_part, column_name)| {
1026 object_name_part
1027 .as_ident()
1028 .map(|ident| {
1029 format!(
1030 r#"{} = '{}'"#,
1031 column_name,
1032 normalizer.normalize(ident.clone())
1033 )
1034 })
1035 .ok_or_else(|| {
1036 plan_datafusion_err!(
1037 "Expected identifier, but found: {:?}",
1038 object_name_part
1039 )
1040 })
1041 })
1042 .collect::<Result<Vec<_>>>()
1043 .map(|parts| parts.join(" AND "))
1044}