1use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::TableReference;
27use datafusion_common::config::SqlParserOptions;
28use datafusion_common::datatype::{DataTypeExt, FieldExt};
29use datafusion_common::error::add_possible_columns_to_diag;
30use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
31use datafusion_common::{
32 DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
33 plan_datafusion_err,
34};
35use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
36pub use datafusion_expr::planner::ContextProvider;
37use datafusion_expr::utils::find_column_exprs;
38use datafusion_expr::{Expr, col};
39use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
40use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
41use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
42
43#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46 pub parse_float_as_decimal: bool,
48 pub enable_ident_normalization: bool,
50 pub support_varchar_with_length: bool,
52 pub enable_options_value_normalization: bool,
54 pub collect_spans: bool,
56 pub map_string_types_to_utf8view: bool,
58 pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63 pub fn new() -> Self {
74 Self {
75 parse_float_as_decimal: false,
76 enable_ident_normalization: true,
77 support_varchar_with_length: true,
78 map_string_types_to_utf8view: true,
79 enable_options_value_normalization: false,
80 collect_spans: false,
81 default_null_ordering: NullOrdering::NullsMax,
84 }
85 }
86
87 pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97 self.parse_float_as_decimal = value;
98 self
99 }
100
101 pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111 self.enable_ident_normalization = value;
112 self
113 }
114
115 pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117 self.support_varchar_with_length = value;
118 self
119 }
120
121 pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123 self.map_string_types_to_utf8view = value;
124 self
125 }
126
127 pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129 self.enable_options_value_normalization = value;
130 self
131 }
132
133 pub fn with_collect_spans(mut self, value: bool) -> Self {
135 self.collect_spans = value;
136 self
137 }
138
139 pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141 self.default_null_ordering = value;
142 self
143 }
144}
145
146impl Default for ParserOptions {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153 fn from(options: &SqlParserOptions) -> Self {
154 Self {
155 parse_float_as_decimal: options.parse_float_as_decimal,
156 enable_ident_normalization: options.enable_ident_normalization,
157 support_varchar_with_length: options.support_varchar_with_length,
158 map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159 enable_options_value_normalization: options
160 .enable_options_value_normalization,
161 collect_spans: options.collect_spans,
162 default_null_ordering: options.default_null_ordering.as_str().into(),
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170 NullsMax,
172 NullsMin,
174 NullsFirst,
176 NullsLast,
178}
179
180impl NullOrdering {
181 pub fn nulls_first(&self, asc: bool) -> bool {
187 match self {
188 Self::NullsMax => !asc,
189 Self::NullsMin => asc,
190 Self::NullsFirst => true,
191 Self::NullsLast => false,
192 }
193 }
194}
195
196impl FromStr for NullOrdering {
197 type Err = DataFusionError;
198
199 fn from_str(s: &str) -> Result<Self> {
200 match s {
201 "nulls_max" => Ok(Self::NullsMax),
202 "nulls_min" => Ok(Self::NullsMin),
203 "nulls_first" => Ok(Self::NullsFirst),
204 "nulls_last" => Ok(Self::NullsLast),
205 _ => plan_err!(
206 "Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"
207 ),
208 }
209 }
210}
211
212impl From<&str> for NullOrdering {
213 fn from(s: &str) -> Self {
214 Self::from_str(s).unwrap_or(Self::NullsMax)
215 }
216}
217
218#[derive(Debug)]
220pub struct IdentNormalizer {
221 normalize: bool,
222}
223
224impl Default for IdentNormalizer {
225 fn default() -> Self {
226 Self { normalize: true }
227 }
228}
229
230impl IdentNormalizer {
231 pub fn new(normalize: bool) -> Self {
232 Self { normalize }
233 }
234
235 pub fn normalize(&self, ident: Ident) -> String {
236 if self.normalize {
237 crate::utils::normalize_ident(ident)
238 } else {
239 ident.value
240 }
241 }
242}
243
244#[derive(Debug, Clone)]
257pub struct PlannerContext {
258 prepare_param_data_types: Arc<Vec<FieldRef>>,
261 ctes: HashMap<String, Arc<LogicalPlan>>,
264 outer_query_schema: Option<DFSchemaRef>,
266 outer_from_schema: Option<DFSchemaRef>,
269 create_table_schema: Option<DFSchemaRef>,
271}
272
273impl Default for PlannerContext {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279impl PlannerContext {
280 pub fn new() -> Self {
282 Self {
283 prepare_param_data_types: Arc::new(vec![]),
284 ctes: HashMap::new(),
285 outer_query_schema: None,
286 outer_from_schema: None,
287 create_table_schema: None,
288 }
289 }
290
291 pub fn with_prepare_param_data_types(
293 mut self,
294 prepare_param_data_types: Vec<FieldRef>,
295 ) -> Self {
296 self.prepare_param_data_types = prepare_param_data_types.into();
297 self
298 }
299
300 pub fn outer_query_schema(&self) -> Option<&DFSchema> {
302 self.outer_query_schema.as_ref().map(|s| s.as_ref())
303 }
304
305 pub fn set_outer_query_schema(
308 &mut self,
309 mut schema: Option<DFSchemaRef>,
310 ) -> Option<DFSchemaRef> {
311 std::mem::swap(&mut self.outer_query_schema, &mut schema);
312 schema
313 }
314
315 pub fn set_table_schema(
316 &mut self,
317 mut schema: Option<DFSchemaRef>,
318 ) -> Option<DFSchemaRef> {
319 std::mem::swap(&mut self.create_table_schema, &mut schema);
320 schema
321 }
322
323 pub fn table_schema(&self) -> Option<DFSchemaRef> {
324 self.create_table_schema.clone()
325 }
326
327 pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
329 self.outer_from_schema.clone()
330 }
331
332 pub fn set_outer_from_schema(
334 &mut self,
335 mut schema: Option<DFSchemaRef>,
336 ) -> Option<DFSchemaRef> {
337 std::mem::swap(&mut self.outer_from_schema, &mut schema);
338 schema
339 }
340
341 pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
343 match self.outer_from_schema.as_mut() {
344 Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
345 None => self.outer_from_schema = Some(Arc::clone(schema)),
346 };
347 Ok(())
348 }
349
350 pub fn prepare_param_data_types(&self) -> &[FieldRef] {
352 &self.prepare_param_data_types
353 }
354
355 pub fn contains_cte(&self, cte_name: &str) -> bool {
358 self.ctes.contains_key(cte_name)
359 }
360
361 pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
364 let cte_name = cte_name.into();
365 self.ctes.insert(cte_name, Arc::new(plan));
366 }
367
368 pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
371 self.ctes.get(cte_name).map(|cte| cte.as_ref())
372 }
373
374 pub(super) fn remove_cte(&mut self, cte_name: &str) {
376 self.ctes.remove(cte_name);
377 }
378}
379
380pub struct SqlToRel<'a, S: ContextProvider> {
400 pub(crate) context_provider: &'a S,
401 pub(crate) options: ParserOptions,
402 pub(crate) ident_normalizer: IdentNormalizer,
403}
404
405impl<'a, S: ContextProvider> SqlToRel<'a, S> {
406 pub fn new(context_provider: &'a S) -> Self {
410 let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
411 Self::new_with_options(context_provider, parser_options)
412 }
413
414 pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
419 let ident_normalize = options.enable_ident_normalization;
420
421 SqlToRel {
422 context_provider,
423 options,
424 ident_normalizer: IdentNormalizer::new(ident_normalize),
425 }
426 }
427
428 pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
429 let mut fields = Vec::with_capacity(columns.len());
430
431 for column in columns {
432 let data_type = self.convert_data_type_to_field(&column.data_type)?;
433 let not_nullable = column
434 .options
435 .iter()
436 .any(|x| x.option == ColumnOption::NotNull);
437 fields.push(
438 data_type
439 .as_ref()
440 .clone()
441 .with_name(self.ident_normalizer.normalize(column.name))
442 .with_nullable(!not_nullable),
443 );
444 }
445
446 Ok(Schema::new(fields))
447 }
448
449 pub(super) fn build_column_defaults(
451 &self,
452 columns: &Vec<SQLColumnDef>,
453 planner_context: &mut PlannerContext,
454 ) -> Result<Vec<(String, Expr)>> {
455 let mut column_defaults = vec![];
456 let empty_schema = DFSchema::empty();
458 let error_desc = |e: DataFusionError| match e {
459 DataFusionError::SchemaError(ref err, _)
460 if matches!(**err, SchemaError::FieldNotFound { .. }) =>
461 {
462 plan_datafusion_err!(
463 "Column reference is not allowed in the DEFAULT expression : {}",
464 e
465 )
466 }
467 _ => e,
468 };
469
470 for column in columns {
471 if let Some(default_sql_expr) =
472 column.options.iter().find_map(|o| match &o.option {
473 ColumnOption::Default(expr) => Some(expr),
474 _ => None,
475 })
476 {
477 let default_expr = self
478 .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
479 .map_err(error_desc)?;
480 column_defaults.push((
481 self.ident_normalizer.normalize(column.name.clone()),
482 default_expr,
483 ));
484 }
485 }
486 Ok(column_defaults)
487 }
488
489 pub(crate) fn apply_table_alias(
491 &self,
492 plan: LogicalPlan,
493 alias: TableAlias,
494 ) -> Result<LogicalPlan> {
495 let idents = alias.columns.into_iter().map(|c| c.name).collect();
496 let plan = self.apply_expr_alias(plan, idents)?;
497
498 LogicalPlanBuilder::from(plan)
499 .alias(TableReference::bare(
500 self.ident_normalizer.normalize(alias.name),
501 ))?
502 .build()
503 }
504
505 pub(crate) fn apply_expr_alias(
506 &self,
507 plan: LogicalPlan,
508 idents: Vec<Ident>,
509 ) -> Result<LogicalPlan> {
510 if idents.is_empty() {
511 Ok(plan)
512 } else if idents.len() != plan.schema().fields().len() {
513 plan_err!(
514 "Source table contains {} columns but only {} \
515 names given as column alias",
516 plan.schema().fields().len(),
517 idents.len()
518 )
519 } else {
520 let fields = plan.schema().fields().clone();
521 LogicalPlanBuilder::from(plan)
522 .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
523 col(field.name()).alias(self.ident_normalizer.normalize(ident))
524 }))?
525 .build()
526 }
527 }
528
529 pub(crate) fn validate_schema_satisfies_exprs(
531 &self,
532 schema: &DFSchema,
533 exprs: &[Expr],
534 ) -> Result<()> {
535 find_column_exprs(exprs)
536 .iter()
537 .try_for_each(|col| match col {
538 Expr::Column(col) => match &col.relation {
539 Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
540 None => {
541 if !schema.fields_with_unqualified_name(&col.name).is_empty() {
542 Ok(())
543 } else {
544 Err(field_not_found(
545 col.relation.clone(),
546 col.name.as_str(),
547 schema,
548 ))
549 }
550 }
551 }
552 .map_err(|err: DataFusionError| match &err {
553 DataFusionError::SchemaError(inner, _)
554 if matches!(
555 inner.as_ref(),
556 SchemaError::FieldNotFound { .. }
557 ) =>
558 {
559 let SchemaError::FieldNotFound {
560 field,
561 valid_fields,
562 } = inner.as_ref()
563 else {
564 unreachable!()
565 };
566 let mut diagnostic = if let Some(relation) = &col.relation {
567 Diagnostic::new_error(
568 format!(
569 "column '{}' not found in '{}'",
570 &col.name, relation
571 ),
572 col.spans().first(),
573 )
574 } else {
575 Diagnostic::new_error(
576 format!("column '{}' not found", &col.name),
577 col.spans().first(),
578 )
579 };
580 add_possible_columns_to_diag(
581 &mut diagnostic,
582 field,
583 valid_fields,
584 );
585 err.with_diagnostic(diagnostic)
586 }
587 _ => err,
588 }),
589 _ => internal_err!("Not a column"),
590 })
591 }
592
593 pub(crate) fn convert_data_type_to_field(
594 &self,
595 sql_type: &SQLDataType,
596 ) -> Result<FieldRef> {
597 if let Some(type_planner) = self.context_provider.get_type_planner()
599 && let Some(data_type) = type_planner.plan_type(sql_type)?
600 {
601 return Ok(data_type.into_nullable_field_ref());
602 }
603
604 match sql_type {
606 SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
607 Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
609 }
610 SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
611 inner_sql_type,
612 maybe_array_size,
613 )) => {
614 let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
615 if let Some(array_size) = maybe_array_size {
616 let array_size: i32 = (*array_size).try_into().map_err(|_| {
617 plan_datafusion_err!(
618 "Array size must be a positive 32 bit integer, got {array_size}"
619 )
620 })?;
621 Ok(inner_field.into_fixed_size_list(array_size))
622 } else {
623 Ok(inner_field.into_list())
624 }
625 }
626 SQLDataType::Array(ArrayElemTypeDef::None) => {
627 not_impl_err!("Arrays with unspecified type is not supported")
628 }
629 other => Ok(self
630 .convert_simple_data_type(other)?
631 .into_nullable_field_ref()),
632 }
633 }
634
635 fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
636 match sql_type {
637 SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
638 SQLDataType::TinyInt(_) => Ok(DataType::Int8),
639 SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
640 SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
641 Ok(DataType::Int32)
642 }
643 SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
644 SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
645 SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
646 Ok(DataType::UInt16)
647 }
648 SQLDataType::IntUnsigned(_)
649 | SQLDataType::IntegerUnsigned(_)
650 | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
651 SQLDataType::Varchar(length) => {
652 match (length, self.options.support_varchar_with_length) {
653 (Some(_), false) => plan_err!(
654 "does not support Varchar with length, \
655 please set `support_varchar_with_length` to be true"
656 ),
657 _ => {
658 if self.options.map_string_types_to_utf8view {
659 Ok(DataType::Utf8View)
660 } else {
661 Ok(DataType::Utf8)
662 }
663 }
664 }
665 }
666 SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
667 Ok(DataType::UInt64)
668 }
669 SQLDataType::Float(_) => Ok(DataType::Float32),
670 SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
671 SQLDataType::Double(ExactNumberInfo::None)
672 | SQLDataType::DoublePrecision
673 | SQLDataType::Float8 => Ok(DataType::Float64),
674 SQLDataType::Double(
675 ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
676 ) => {
677 not_impl_err!(
678 "Unsupported SQL type (precision/scale not supported) {sql_type}"
679 )
680 }
681 SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
682 if self.options.map_string_types_to_utf8view {
683 Ok(DataType::Utf8View)
684 } else {
685 Ok(DataType::Utf8)
686 }
687 }
688 SQLDataType::Timestamp(precision, tz_info)
689 if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
690 {
691 let tz = if matches!(tz_info, TimezoneInfo::Tz)
692 || matches!(tz_info, TimezoneInfo::WithTimeZone)
693 {
694 self.context_provider.options().execution.time_zone.clone()
698 } else {
699 None
701 };
702 let precision = match precision {
703 Some(0) => TimeUnit::Second,
704 Some(3) => TimeUnit::Millisecond,
705 Some(6) => TimeUnit::Microsecond,
706 None | Some(9) => TimeUnit::Nanosecond,
707 _ => unreachable!(),
708 };
709 Ok(DataType::Timestamp(precision, tz.map(Into::into)))
710 }
711 SQLDataType::Date => Ok(DataType::Date32),
712 SQLDataType::Time(None, tz_info) => {
713 if matches!(tz_info, TimezoneInfo::None)
714 || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
715 {
716 Ok(DataType::Time64(TimeUnit::Nanosecond))
717 } else {
718 not_impl_err!("Unsupported SQL type {sql_type}")
720 }
721 }
722 SQLDataType::Numeric(exact_number_info)
723 | SQLDataType::Decimal(exact_number_info) => {
724 let (precision, scale) = match *exact_number_info {
725 ExactNumberInfo::None => (None, None),
726 ExactNumberInfo::Precision(precision) => (Some(precision), None),
727 ExactNumberInfo::PrecisionAndScale(precision, scale) => {
728 (Some(precision), Some(scale))
729 }
730 };
731 make_decimal_type(precision, scale.map(|s| s as u64))
732 }
733 SQLDataType::Bytea => Ok(DataType::Binary),
734 SQLDataType::Interval { fields, precision } => {
735 if fields.is_some() || precision.is_some() {
736 return not_impl_err!("Unsupported SQL type {sql_type}");
737 }
738 Ok(DataType::Interval(IntervalUnit::MonthDayNano))
739 }
740 SQLDataType::Struct(fields, _) => {
741 let fields = fields
742 .iter()
743 .enumerate()
744 .map(|(idx, sql_struct_field)| {
745 let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
746 let field_name = match &sql_struct_field.field_name {
747 Some(ident) => ident.clone(),
748 None => Ident::new(format!("c{idx}")),
749 };
750 Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
751 })
752 .collect::<Result<Vec<_>>>()?;
753 Ok(DataType::Struct(Fields::from(fields)))
754 }
755 SQLDataType::Nvarchar(_)
756 | SQLDataType::JSON
757 | SQLDataType::Uuid
758 | SQLDataType::Binary(_)
759 | SQLDataType::Varbinary(_)
760 | SQLDataType::Blob(_)
761 | SQLDataType::Datetime(_)
762 | SQLDataType::Regclass
763 | SQLDataType::Custom(_, _)
764 | SQLDataType::Array(_)
765 | SQLDataType::Enum(_, _)
766 | SQLDataType::Set(_)
767 | SQLDataType::MediumInt(_)
768 | SQLDataType::MediumIntUnsigned(_)
769 | SQLDataType::Character(_)
770 | SQLDataType::CharacterVarying(_)
771 | SQLDataType::CharVarying(_)
772 | SQLDataType::CharacterLargeObject(_)
773 | SQLDataType::CharLargeObject(_)
774 | SQLDataType::Timestamp(_, _)
775 | SQLDataType::Time(Some(_), _)
776 | SQLDataType::Dec(_)
777 | SQLDataType::BigNumeric(_)
778 | SQLDataType::BigDecimal(_)
779 | SQLDataType::Clob(_)
780 | SQLDataType::Bytes(_)
781 | SQLDataType::Int64
782 | SQLDataType::Float64
783 | SQLDataType::JSONB
784 | SQLDataType::Unspecified
785 | SQLDataType::Int16
786 | SQLDataType::Int32
787 | SQLDataType::Int128
788 | SQLDataType::Int256
789 | SQLDataType::UInt8
790 | SQLDataType::UInt16
791 | SQLDataType::UInt32
792 | SQLDataType::UInt64
793 | SQLDataType::UInt128
794 | SQLDataType::UInt256
795 | SQLDataType::Float32
796 | SQLDataType::Date32
797 | SQLDataType::Datetime64(_, _)
798 | SQLDataType::FixedString(_)
799 | SQLDataType::Map(_, _)
800 | SQLDataType::Tuple(_)
801 | SQLDataType::Nested(_)
802 | SQLDataType::Union(_)
803 | SQLDataType::Nullable(_)
804 | SQLDataType::LowCardinality(_)
805 | SQLDataType::Trigger
806 | SQLDataType::TinyBlob
807 | SQLDataType::MediumBlob
808 | SQLDataType::LongBlob
809 | SQLDataType::TinyText
810 | SQLDataType::MediumText
811 | SQLDataType::LongText
812 | SQLDataType::Bit(_)
813 | SQLDataType::BitVarying(_)
814 | SQLDataType::Signed
815 | SQLDataType::SignedInteger
816 | SQLDataType::Unsigned
817 | SQLDataType::UnsignedInteger
818 | SQLDataType::AnyType
819 | SQLDataType::Table(_)
820 | SQLDataType::VarBit(_)
821 | SQLDataType::UTinyInt
822 | SQLDataType::USmallInt
823 | SQLDataType::HugeInt
824 | SQLDataType::UHugeInt
825 | SQLDataType::UBigInt
826 | SQLDataType::TimestampNtz
827 | SQLDataType::NamedTable { .. }
828 | SQLDataType::TsVector
829 | SQLDataType::TsQuery
830 | SQLDataType::GeometricType(_)
831 | SQLDataType::DecimalUnsigned(_) | SQLDataType::FloatUnsigned(_) | SQLDataType::RealUnsigned | SQLDataType::DecUnsigned(_) | SQLDataType::DoubleUnsigned(_) | SQLDataType::DoublePrecisionUnsigned => {
838 not_impl_err!("Unsupported SQL type {sql_type}")
839 }
840 }
841 }
842
843 pub(crate) fn object_name_to_table_reference(
844 &self,
845 object_name: ObjectName,
846 ) -> Result<TableReference> {
847 object_name_to_table_reference(
848 object_name,
849 self.options.enable_ident_normalization,
850 )
851 }
852}
853
854pub fn object_name_to_table_reference(
865 object_name: ObjectName,
866 enable_normalization: bool,
867) -> Result<TableReference> {
868 let ObjectName(object_name_parts) = object_name;
870 let idents = object_name_parts
871 .into_iter()
872 .map(|object_name_part| {
873 object_name_part.as_ident().cloned().ok_or_else(|| {
874 plan_datafusion_err!(
875 "Expected identifier, but found: {:?}",
876 object_name_part
877 )
878 })
879 })
880 .collect::<Result<Vec<_>>>()?;
881 idents_to_table_reference(idents, enable_normalization)
882}
883
884struct IdentTaker {
885 normalizer: IdentNormalizer,
886 idents: Vec<Ident>,
887}
888
889impl IdentTaker {
892 fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
893 Self {
894 normalizer: IdentNormalizer::new(enable_normalization),
895 idents,
896 }
897 }
898
899 fn take(&mut self) -> String {
900 let ident = self.idents.pop().expect("no more identifiers");
901 self.normalizer.normalize(ident)
902 }
903
904 fn len(&self) -> usize {
906 self.idents.len()
907 }
908}
909
910impl std::fmt::Display for IdentTaker {
912 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
913 let mut first = true;
914 for ident in self.idents.iter() {
915 if !first {
916 write!(f, ".")?;
917 }
918 write!(f, "{ident}")?;
919 first = false;
920 }
921
922 Ok(())
923 }
924}
925
926pub(crate) fn idents_to_table_reference(
928 idents: Vec<Ident>,
929 enable_normalization: bool,
930) -> Result<TableReference> {
931 let mut taker = IdentTaker::new(idents, enable_normalization);
932
933 match taker.len() {
934 1 => {
935 let table = taker.take();
936 Ok(TableReference::bare(table))
937 }
938 2 => {
939 let table = taker.take();
940 let schema = taker.take();
941 Ok(TableReference::partial(schema, table))
942 }
943 3 => {
944 let table = taker.take();
945 let schema = taker.take();
946 let catalog = taker.take();
947 Ok(TableReference::full(catalog, schema, table))
948 }
949 _ => plan_err!(
950 "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
951 taker,
952 taker.len()
953 ),
954 }
955}
956
957pub fn object_name_to_qualifier(
960 sql_table_name: &ObjectName,
961 enable_normalization: bool,
962) -> Result<String> {
963 let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
964 let normalizer = IdentNormalizer::new(enable_normalization);
965 sql_table_name
966 .0
967 .iter()
968 .rev()
969 .zip(columns)
970 .map(|(object_name_part, column_name)| {
971 object_name_part
972 .as_ident()
973 .map(|ident| {
974 format!(
975 r#"{} = '{}'"#,
976 column_name,
977 normalizer.normalize(ident.clone())
978 )
979 })
980 .ok_or_else(|| {
981 plan_datafusion_err!(
982 "Expected identifier, but found: {:?}",
983 object_name_part
984 )
985 })
986 })
987 .collect::<Result<Vec<_>>>()
988 .map(|parts| parts.join(" AND "))
989}