1use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::config::SqlParserOptions;
27use datafusion_common::datatype::{DataTypeExt, FieldExt};
28use datafusion_common::error::add_possible_columns_to_diag;
29use datafusion_common::TableReference;
30use datafusion_common::{
31 field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic,
32 SchemaError,
33};
34use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{col, Expr};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46 pub parse_float_as_decimal: bool,
48 pub enable_ident_normalization: bool,
50 pub support_varchar_with_length: bool,
52 pub enable_options_value_normalization: bool,
54 pub collect_spans: bool,
56 pub map_string_types_to_utf8view: bool,
58 pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63 pub fn new() -> Self {
74 Self {
75 parse_float_as_decimal: false,
76 enable_ident_normalization: true,
77 support_varchar_with_length: true,
78 map_string_types_to_utf8view: true,
79 enable_options_value_normalization: false,
80 collect_spans: false,
81 default_null_ordering: NullOrdering::NullsMax,
84 }
85 }
86
87 pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97 self.parse_float_as_decimal = value;
98 self
99 }
100
101 pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111 self.enable_ident_normalization = value;
112 self
113 }
114
115 pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117 self.support_varchar_with_length = value;
118 self
119 }
120
121 pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123 self.map_string_types_to_utf8view = value;
124 self
125 }
126
127 pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129 self.enable_options_value_normalization = value;
130 self
131 }
132
133 pub fn with_collect_spans(mut self, value: bool) -> Self {
135 self.collect_spans = value;
136 self
137 }
138
139 pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141 self.default_null_ordering = value;
142 self
143 }
144}
145
146impl Default for ParserOptions {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153 fn from(options: &SqlParserOptions) -> Self {
154 Self {
155 parse_float_as_decimal: options.parse_float_as_decimal,
156 enable_ident_normalization: options.enable_ident_normalization,
157 support_varchar_with_length: options.support_varchar_with_length,
158 map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159 enable_options_value_normalization: options
160 .enable_options_value_normalization,
161 collect_spans: options.collect_spans,
162 default_null_ordering: options.default_null_ordering.as_str().into(),
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170 NullsMax,
172 NullsMin,
174 NullsFirst,
176 NullsLast,
178}
179
180impl NullOrdering {
181 pub fn nulls_first(&self, asc: bool) -> bool {
187 match self {
188 Self::NullsMax => !asc,
189 Self::NullsMin => asc,
190 Self::NullsFirst => true,
191 Self::NullsLast => false,
192 }
193 }
194}
195
196impl FromStr for NullOrdering {
197 type Err = DataFusionError;
198
199 fn from_str(s: &str) -> Result<Self> {
200 match s {
201 "nulls_max" => Ok(Self::NullsMax),
202 "nulls_min" => Ok(Self::NullsMin),
203 "nulls_first" => Ok(Self::NullsFirst),
204 "nulls_last" => Ok(Self::NullsLast),
205 _ => plan_err!("Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"),
206 }
207 }
208}
209
210impl From<&str> for NullOrdering {
211 fn from(s: &str) -> Self {
212 Self::from_str(s).unwrap_or(Self::NullsMax)
213 }
214}
215
216#[derive(Debug)]
218pub struct IdentNormalizer {
219 normalize: bool,
220}
221
222impl Default for IdentNormalizer {
223 fn default() -> Self {
224 Self { normalize: true }
225 }
226}
227
228impl IdentNormalizer {
229 pub fn new(normalize: bool) -> Self {
230 Self { normalize }
231 }
232
233 pub fn normalize(&self, ident: Ident) -> String {
234 if self.normalize {
235 crate::utils::normalize_ident(ident)
236 } else {
237 ident.value
238 }
239 }
240}
241
242#[derive(Debug, Clone)]
255pub struct PlannerContext {
256 prepare_param_data_types: Arc<Vec<FieldRef>>,
259 ctes: HashMap<String, Arc<LogicalPlan>>,
262 outer_query_schema: Option<DFSchemaRef>,
264 outer_from_schema: Option<DFSchemaRef>,
267 create_table_schema: Option<DFSchemaRef>,
269}
270
271impl Default for PlannerContext {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277impl PlannerContext {
278 pub fn new() -> Self {
280 Self {
281 prepare_param_data_types: Arc::new(vec![]),
282 ctes: HashMap::new(),
283 outer_query_schema: None,
284 outer_from_schema: None,
285 create_table_schema: None,
286 }
287 }
288
289 pub fn with_prepare_param_data_types(
291 mut self,
292 prepare_param_data_types: Vec<FieldRef>,
293 ) -> Self {
294 self.prepare_param_data_types = prepare_param_data_types.into();
295 self
296 }
297
298 pub fn outer_query_schema(&self) -> Option<&DFSchema> {
300 self.outer_query_schema.as_ref().map(|s| s.as_ref())
301 }
302
303 pub fn set_outer_query_schema(
306 &mut self,
307 mut schema: Option<DFSchemaRef>,
308 ) -> Option<DFSchemaRef> {
309 std::mem::swap(&mut self.outer_query_schema, &mut schema);
310 schema
311 }
312
313 pub fn set_table_schema(
314 &mut self,
315 mut schema: Option<DFSchemaRef>,
316 ) -> Option<DFSchemaRef> {
317 std::mem::swap(&mut self.create_table_schema, &mut schema);
318 schema
319 }
320
321 pub fn table_schema(&self) -> Option<DFSchemaRef> {
322 self.create_table_schema.clone()
323 }
324
325 pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
327 self.outer_from_schema.clone()
328 }
329
330 pub fn set_outer_from_schema(
332 &mut self,
333 mut schema: Option<DFSchemaRef>,
334 ) -> Option<DFSchemaRef> {
335 std::mem::swap(&mut self.outer_from_schema, &mut schema);
336 schema
337 }
338
339 pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
341 match self.outer_from_schema.as_mut() {
342 Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
343 None => self.outer_from_schema = Some(Arc::clone(schema)),
344 };
345 Ok(())
346 }
347
348 pub fn prepare_param_data_types(&self) -> &[FieldRef] {
350 &self.prepare_param_data_types
351 }
352
353 pub fn contains_cte(&self, cte_name: &str) -> bool {
356 self.ctes.contains_key(cte_name)
357 }
358
359 pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
362 let cte_name = cte_name.into();
363 self.ctes.insert(cte_name, Arc::new(plan));
364 }
365
366 pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
369 self.ctes.get(cte_name).map(|cte| cte.as_ref())
370 }
371
372 pub(super) fn remove_cte(&mut self, cte_name: &str) {
374 self.ctes.remove(cte_name);
375 }
376}
377
378pub struct SqlToRel<'a, S: ContextProvider> {
398 pub(crate) context_provider: &'a S,
399 pub(crate) options: ParserOptions,
400 pub(crate) ident_normalizer: IdentNormalizer,
401}
402
403impl<'a, S: ContextProvider> SqlToRel<'a, S> {
404 pub fn new(context_provider: &'a S) -> Self {
408 let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
409 Self::new_with_options(context_provider, parser_options)
410 }
411
412 pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
417 let ident_normalize = options.enable_ident_normalization;
418
419 SqlToRel {
420 context_provider,
421 options,
422 ident_normalizer: IdentNormalizer::new(ident_normalize),
423 }
424 }
425
426 pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
427 let mut fields = Vec::with_capacity(columns.len());
428
429 for column in columns {
430 let data_type = self.convert_data_type_to_field(&column.data_type)?;
431 let not_nullable = column
432 .options
433 .iter()
434 .any(|x| x.option == ColumnOption::NotNull);
435 fields.push(
436 data_type
437 .as_ref()
438 .clone()
439 .with_name(self.ident_normalizer.normalize(column.name))
440 .with_nullable(!not_nullable),
441 );
442 }
443
444 Ok(Schema::new(fields))
445 }
446
447 pub(super) fn build_column_defaults(
449 &self,
450 columns: &Vec<SQLColumnDef>,
451 planner_context: &mut PlannerContext,
452 ) -> Result<Vec<(String, Expr)>> {
453 let mut column_defaults = vec![];
454 let empty_schema = DFSchema::empty();
456 let error_desc = |e: DataFusionError| match e {
457 DataFusionError::SchemaError(ref err, _)
458 if matches!(**err, SchemaError::FieldNotFound { .. }) =>
459 {
460 plan_datafusion_err!(
461 "Column reference is not allowed in the DEFAULT expression : {}",
462 e
463 )
464 }
465 _ => e,
466 };
467
468 for column in columns {
469 if let Some(default_sql_expr) =
470 column.options.iter().find_map(|o| match &o.option {
471 ColumnOption::Default(expr) => Some(expr),
472 _ => None,
473 })
474 {
475 let default_expr = self
476 .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
477 .map_err(error_desc)?;
478 column_defaults.push((
479 self.ident_normalizer.normalize(column.name.clone()),
480 default_expr,
481 ));
482 }
483 }
484 Ok(column_defaults)
485 }
486
487 pub(crate) fn apply_table_alias(
489 &self,
490 plan: LogicalPlan,
491 alias: TableAlias,
492 ) -> Result<LogicalPlan> {
493 let idents = alias.columns.into_iter().map(|c| c.name).collect();
494 let plan = self.apply_expr_alias(plan, idents)?;
495
496 LogicalPlanBuilder::from(plan)
497 .alias(TableReference::bare(
498 self.ident_normalizer.normalize(alias.name),
499 ))?
500 .build()
501 }
502
503 pub(crate) fn apply_expr_alias(
504 &self,
505 plan: LogicalPlan,
506 idents: Vec<Ident>,
507 ) -> Result<LogicalPlan> {
508 if idents.is_empty() {
509 Ok(plan)
510 } else if idents.len() != plan.schema().fields().len() {
511 plan_err!(
512 "Source table contains {} columns but only {} \
513 names given as column alias",
514 plan.schema().fields().len(),
515 idents.len()
516 )
517 } else {
518 let fields = plan.schema().fields().clone();
519 LogicalPlanBuilder::from(plan)
520 .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
521 col(field.name()).alias(self.ident_normalizer.normalize(ident))
522 }))?
523 .build()
524 }
525 }
526
527 pub(crate) fn validate_schema_satisfies_exprs(
529 &self,
530 schema: &DFSchema,
531 exprs: &[Expr],
532 ) -> Result<()> {
533 find_column_exprs(exprs)
534 .iter()
535 .try_for_each(|col| match col {
536 Expr::Column(col) => match &col.relation {
537 Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
538 None => {
539 if !schema.fields_with_unqualified_name(&col.name).is_empty() {
540 Ok(())
541 } else {
542 Err(field_not_found(
543 col.relation.clone(),
544 col.name.as_str(),
545 schema,
546 ))
547 }
548 }
549 }
550 .map_err(|err: DataFusionError| match &err {
551 DataFusionError::SchemaError(inner, _)
552 if matches!(
553 inner.as_ref(),
554 SchemaError::FieldNotFound { .. }
555 ) =>
556 {
557 let SchemaError::FieldNotFound {
558 field,
559 valid_fields,
560 } = inner.as_ref()
561 else {
562 unreachable!()
563 };
564 let mut diagnostic = if let Some(relation) = &col.relation {
565 Diagnostic::new_error(
566 format!(
567 "column '{}' not found in '{}'",
568 &col.name, relation
569 ),
570 col.spans().first(),
571 )
572 } else {
573 Diagnostic::new_error(
574 format!("column '{}' not found", &col.name),
575 col.spans().first(),
576 )
577 };
578 add_possible_columns_to_diag(
579 &mut diagnostic,
580 field,
581 valid_fields,
582 );
583 err.with_diagnostic(diagnostic)
584 }
585 _ => err,
586 }),
587 _ => internal_err!("Not a column"),
588 })
589 }
590
591 pub(crate) fn convert_data_type_to_field(
592 &self,
593 sql_type: &SQLDataType,
594 ) -> Result<FieldRef> {
595 if let Some(type_planner) = self.context_provider.get_type_planner() {
597 if let Some(data_type) = type_planner.plan_type(sql_type)? {
598 return Ok(data_type.into_nullable_field_ref());
599 }
600 }
601
602 match sql_type {
604 SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
605 Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
607 }
608 SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
609 inner_sql_type,
610 maybe_array_size,
611 )) => {
612 let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
613 if let Some(array_size) = maybe_array_size {
614 let array_size: i32 = (*array_size).try_into().map_err(|_| {
615 plan_datafusion_err!(
616 "Array size must be a positive 32 bit integer, got {array_size}"
617 )
618 })?;
619 Ok(inner_field.into_fixed_size_list(array_size))
620 } else {
621 Ok(inner_field.into_list())
622 }
623 }
624 SQLDataType::Array(ArrayElemTypeDef::None) => {
625 not_impl_err!("Arrays with unspecified type is not supported")
626 }
627 other => Ok(self
628 .convert_simple_data_type(other)?
629 .into_nullable_field_ref()),
630 }
631 }
632
633 fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
634 match sql_type {
635 SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
636 SQLDataType::TinyInt(_) => Ok(DataType::Int8),
637 SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
638 SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
639 Ok(DataType::Int32)
640 }
641 SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
642 SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
643 SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
644 Ok(DataType::UInt16)
645 }
646 SQLDataType::IntUnsigned(_)
647 | SQLDataType::IntegerUnsigned(_)
648 | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
649 SQLDataType::Varchar(length) => {
650 match (length, self.options.support_varchar_with_length) {
651 (Some(_), false) => plan_err!(
652 "does not support Varchar with length, \
653 please set `support_varchar_with_length` to be true"
654 ),
655 _ => {
656 if self.options.map_string_types_to_utf8view {
657 Ok(DataType::Utf8View)
658 } else {
659 Ok(DataType::Utf8)
660 }
661 }
662 }
663 }
664 SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
665 Ok(DataType::UInt64)
666 }
667 SQLDataType::Float(_) => Ok(DataType::Float32),
668 SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
669 SQLDataType::Double(ExactNumberInfo::None)
670 | SQLDataType::DoublePrecision
671 | SQLDataType::Float8 => Ok(DataType::Float64),
672 SQLDataType::Double(
673 ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
674 ) => {
675 not_impl_err!(
676 "Unsupported SQL type (precision/scale not supported) {sql_type}"
677 )
678 }
679 SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
680 if self.options.map_string_types_to_utf8view {
681 Ok(DataType::Utf8View)
682 } else {
683 Ok(DataType::Utf8)
684 }
685 }
686 SQLDataType::Timestamp(precision, tz_info)
687 if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
688 {
689 let tz = if matches!(tz_info, TimezoneInfo::Tz)
690 || matches!(tz_info, TimezoneInfo::WithTimeZone)
691 {
692 self.context_provider.options().execution.time_zone.clone()
696 } else {
697 None
699 };
700 let precision = match precision {
701 Some(0) => TimeUnit::Second,
702 Some(3) => TimeUnit::Millisecond,
703 Some(6) => TimeUnit::Microsecond,
704 None | Some(9) => TimeUnit::Nanosecond,
705 _ => unreachable!(),
706 };
707 Ok(DataType::Timestamp(precision, tz.map(Into::into)))
708 }
709 SQLDataType::Date => Ok(DataType::Date32),
710 SQLDataType::Time(None, tz_info) => {
711 if matches!(tz_info, TimezoneInfo::None)
712 || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
713 {
714 Ok(DataType::Time64(TimeUnit::Nanosecond))
715 } else {
716 not_impl_err!("Unsupported SQL type {sql_type}")
718 }
719 }
720 SQLDataType::Numeric(exact_number_info)
721 | SQLDataType::Decimal(exact_number_info) => {
722 let (precision, scale) = match *exact_number_info {
723 ExactNumberInfo::None => (None, None),
724 ExactNumberInfo::Precision(precision) => (Some(precision), None),
725 ExactNumberInfo::PrecisionAndScale(precision, scale) => {
726 (Some(precision), Some(scale))
727 }
728 };
729 make_decimal_type(precision, scale.map(|s| s as u64))
730 }
731 SQLDataType::Bytea => Ok(DataType::Binary),
732 SQLDataType::Interval { fields, precision } => {
733 if fields.is_some() || precision.is_some() {
734 return not_impl_err!("Unsupported SQL type {sql_type}");
735 }
736 Ok(DataType::Interval(IntervalUnit::MonthDayNano))
737 }
738 SQLDataType::Struct(fields, _) => {
739 let fields = fields
740 .iter()
741 .enumerate()
742 .map(|(idx, sql_struct_field)| {
743 let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
744 let field_name = match &sql_struct_field.field_name {
745 Some(ident) => ident.clone(),
746 None => Ident::new(format!("c{idx}")),
747 };
748 Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
749 })
750 .collect::<Result<Vec<_>>>()?;
751 Ok(DataType::Struct(Fields::from(fields)))
752 }
753 SQLDataType::Nvarchar(_)
754 | SQLDataType::JSON
755 | SQLDataType::Uuid
756 | SQLDataType::Binary(_)
757 | SQLDataType::Varbinary(_)
758 | SQLDataType::Blob(_)
759 | SQLDataType::Datetime(_)
760 | SQLDataType::Regclass
761 | SQLDataType::Custom(_, _)
762 | SQLDataType::Array(_)
763 | SQLDataType::Enum(_, _)
764 | SQLDataType::Set(_)
765 | SQLDataType::MediumInt(_)
766 | SQLDataType::MediumIntUnsigned(_)
767 | SQLDataType::Character(_)
768 | SQLDataType::CharacterVarying(_)
769 | SQLDataType::CharVarying(_)
770 | SQLDataType::CharacterLargeObject(_)
771 | SQLDataType::CharLargeObject(_)
772 | SQLDataType::Timestamp(_, _)
773 | SQLDataType::Time(Some(_), _)
774 | SQLDataType::Dec(_)
775 | SQLDataType::BigNumeric(_)
776 | SQLDataType::BigDecimal(_)
777 | SQLDataType::Clob(_)
778 | SQLDataType::Bytes(_)
779 | SQLDataType::Int64
780 | SQLDataType::Float64
781 | SQLDataType::JSONB
782 | SQLDataType::Unspecified
783 | SQLDataType::Int16
784 | SQLDataType::Int32
785 | SQLDataType::Int128
786 | SQLDataType::Int256
787 | SQLDataType::UInt8
788 | SQLDataType::UInt16
789 | SQLDataType::UInt32
790 | SQLDataType::UInt64
791 | SQLDataType::UInt128
792 | SQLDataType::UInt256
793 | SQLDataType::Float32
794 | SQLDataType::Date32
795 | SQLDataType::Datetime64(_, _)
796 | SQLDataType::FixedString(_)
797 | SQLDataType::Map(_, _)
798 | SQLDataType::Tuple(_)
799 | SQLDataType::Nested(_)
800 | SQLDataType::Union(_)
801 | SQLDataType::Nullable(_)
802 | SQLDataType::LowCardinality(_)
803 | SQLDataType::Trigger
804 | SQLDataType::TinyBlob
805 | SQLDataType::MediumBlob
806 | SQLDataType::LongBlob
807 | SQLDataType::TinyText
808 | SQLDataType::MediumText
809 | SQLDataType::LongText
810 | SQLDataType::Bit(_)
811 | SQLDataType::BitVarying(_)
812 | SQLDataType::Signed
813 | SQLDataType::SignedInteger
814 | SQLDataType::Unsigned
815 | SQLDataType::UnsignedInteger
816 | SQLDataType::AnyType
817 | SQLDataType::Table(_)
818 | SQLDataType::VarBit(_)
819 | SQLDataType::UTinyInt
820 | SQLDataType::USmallInt
821 | SQLDataType::HugeInt
822 | SQLDataType::UHugeInt
823 | SQLDataType::UBigInt
824 | SQLDataType::TimestampNtz
825 | SQLDataType::NamedTable { .. }
826 | SQLDataType::TsVector
827 | SQLDataType::TsQuery
828 | SQLDataType::GeometricType(_)
829 | SQLDataType::DecimalUnsigned(_) | SQLDataType::FloatUnsigned(_) | SQLDataType::RealUnsigned | SQLDataType::DecUnsigned(_) | SQLDataType::DoubleUnsigned(_) | SQLDataType::DoublePrecisionUnsigned => {
836 not_impl_err!("Unsupported SQL type {sql_type}")
837 }
838 }
839 }
840
841 pub(crate) fn object_name_to_table_reference(
842 &self,
843 object_name: ObjectName,
844 ) -> Result<TableReference> {
845 object_name_to_table_reference(
846 object_name,
847 self.options.enable_ident_normalization,
848 )
849 }
850}
851
852pub fn object_name_to_table_reference(
863 object_name: ObjectName,
864 enable_normalization: bool,
865) -> Result<TableReference> {
866 let ObjectName(object_name_parts) = object_name;
868 let idents = object_name_parts
869 .into_iter()
870 .map(|object_name_part| {
871 object_name_part.as_ident().cloned().ok_or_else(|| {
872 plan_datafusion_err!(
873 "Expected identifier, but found: {:?}",
874 object_name_part
875 )
876 })
877 })
878 .collect::<Result<Vec<_>>>()?;
879 idents_to_table_reference(idents, enable_normalization)
880}
881
882struct IdentTaker {
883 normalizer: IdentNormalizer,
884 idents: Vec<Ident>,
885}
886
887impl IdentTaker {
890 fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
891 Self {
892 normalizer: IdentNormalizer::new(enable_normalization),
893 idents,
894 }
895 }
896
897 fn take(&mut self) -> String {
898 let ident = self.idents.pop().expect("no more identifiers");
899 self.normalizer.normalize(ident)
900 }
901
902 fn len(&self) -> usize {
904 self.idents.len()
905 }
906}
907
908impl std::fmt::Display for IdentTaker {
910 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
911 let mut first = true;
912 for ident in self.idents.iter() {
913 if !first {
914 write!(f, ".")?;
915 }
916 write!(f, "{ident}")?;
917 first = false;
918 }
919
920 Ok(())
921 }
922}
923
924pub(crate) fn idents_to_table_reference(
926 idents: Vec<Ident>,
927 enable_normalization: bool,
928) -> Result<TableReference> {
929 let mut taker = IdentTaker::new(idents, enable_normalization);
930
931 match taker.len() {
932 1 => {
933 let table = taker.take();
934 Ok(TableReference::bare(table))
935 }
936 2 => {
937 let table = taker.take();
938 let schema = taker.take();
939 Ok(TableReference::partial(schema, table))
940 }
941 3 => {
942 let table = taker.take();
943 let schema = taker.take();
944 let catalog = taker.take();
945 Ok(TableReference::full(catalog, schema, table))
946 }
947 _ => plan_err!(
948 "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
949 taker,
950 taker.len()
951 ),
952 }
953}
954
955pub fn object_name_to_qualifier(
958 sql_table_name: &ObjectName,
959 enable_normalization: bool,
960) -> Result<String> {
961 let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
962 let normalizer = IdentNormalizer::new(enable_normalization);
963 sql_table_name
964 .0
965 .iter()
966 .rev()
967 .zip(columns)
968 .map(|(object_name_part, column_name)| {
969 object_name_part
970 .as_ident()
971 .map(|ident| {
972 format!(
973 r#"{} = '{}'"#,
974 column_name,
975 normalizer.normalize(ident.clone())
976 )
977 })
978 .ok_or_else(|| {
979 plan_datafusion_err!(
980 "Expected identifier, but found: {:?}",
981 object_name_part
982 )
983 })
984 })
985 .collect::<Result<Vec<_>>>()
986 .map(|parts| parts.join(" AND "))
987}