1use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use arrow::datatypes::*;
25use datafusion_common::config::SqlParserOptions;
26use datafusion_common::error::add_possible_columns_to_diag;
27use datafusion_common::TableReference;
28use datafusion_common::{
29 field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic,
30 SchemaError,
31};
32use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
33use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
34use datafusion_expr::utils::find_column_exprs;
35use datafusion_expr::{col, Expr};
36use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
37use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
38use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
39
40use crate::utils::make_decimal_type;
41pub use datafusion_expr::planner::ContextProvider;
42
43#[derive(Debug, Clone, Copy)]
45pub struct ParserOptions {
46 pub parse_float_as_decimal: bool,
48 pub enable_ident_normalization: bool,
50 pub support_varchar_with_length: bool,
52 pub enable_options_value_normalization: bool,
54 pub collect_spans: bool,
56 pub map_string_types_to_utf8view: bool,
58 pub default_null_ordering: NullOrdering,
60}
61
62impl ParserOptions {
63 pub fn new() -> Self {
74 Self {
75 parse_float_as_decimal: false,
76 enable_ident_normalization: true,
77 support_varchar_with_length: true,
78 map_string_types_to_utf8view: true,
79 enable_options_value_normalization: false,
80 collect_spans: false,
81 default_null_ordering: NullOrdering::NullsMax,
84 }
85 }
86
87 pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
97 self.parse_float_as_decimal = value;
98 self
99 }
100
101 pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
111 self.enable_ident_normalization = value;
112 self
113 }
114
115 pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
117 self.support_varchar_with_length = value;
118 self
119 }
120
121 pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
123 self.map_string_types_to_utf8view = value;
124 self
125 }
126
127 pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
129 self.enable_options_value_normalization = value;
130 self
131 }
132
133 pub fn with_collect_spans(mut self, value: bool) -> Self {
135 self.collect_spans = value;
136 self
137 }
138
139 pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
141 self.default_null_ordering = value;
142 self
143 }
144}
145
146impl Default for ParserOptions {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl From<&SqlParserOptions> for ParserOptions {
153 fn from(options: &SqlParserOptions) -> Self {
154 Self {
155 parse_float_as_decimal: options.parse_float_as_decimal,
156 enable_ident_normalization: options.enable_ident_normalization,
157 support_varchar_with_length: options.support_varchar_with_length,
158 map_string_types_to_utf8view: options.map_string_types_to_utf8view,
159 enable_options_value_normalization: options
160 .enable_options_value_normalization,
161 collect_spans: options.collect_spans,
162 default_null_ordering: options.default_null_ordering.as_str().into(),
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
169pub enum NullOrdering {
170 NullsMax,
172 NullsMin,
174 NullsFirst,
176 NullsLast,
178}
179
180impl NullOrdering {
181 pub fn nulls_first(&self, asc: bool) -> bool {
187 match self {
188 Self::NullsMax => !asc,
189 Self::NullsMin => asc,
190 Self::NullsFirst => true,
191 Self::NullsLast => false,
192 }
193 }
194}
195
196impl FromStr for NullOrdering {
197 type Err = DataFusionError;
198
199 fn from_str(s: &str) -> Result<Self> {
200 match s {
201 "nulls_max" => Ok(Self::NullsMax),
202 "nulls_min" => Ok(Self::NullsMin),
203 "nulls_first" => Ok(Self::NullsFirst),
204 "nulls_last" => Ok(Self::NullsLast),
205 _ => plan_err!("Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"),
206 }
207 }
208}
209
210impl From<&str> for NullOrdering {
211 fn from(s: &str) -> Self {
212 Self::from_str(s).unwrap_or(Self::NullsMax)
213 }
214}
215
216#[derive(Debug)]
218pub struct IdentNormalizer {
219 normalize: bool,
220}
221
222impl Default for IdentNormalizer {
223 fn default() -> Self {
224 Self { normalize: true }
225 }
226}
227
228impl IdentNormalizer {
229 pub fn new(normalize: bool) -> Self {
230 Self { normalize }
231 }
232
233 pub fn normalize(&self, ident: Ident) -> String {
234 if self.normalize {
235 crate::utils::normalize_ident(ident)
236 } else {
237 ident.value
238 }
239 }
240}
241
242#[derive(Debug, Clone)]
256pub struct PlannerContext {
257 prepare_param_data_types: Arc<Vec<DataType>>,
260 ctes: HashMap<String, Arc<LogicalPlan>>,
263 outer_query_schema: Option<DFSchemaRef>,
265 outer_from_schema: Option<DFSchemaRef>,
268 create_table_schema: Option<DFSchemaRef>,
270}
271
272impl Default for PlannerContext {
273 fn default() -> Self {
274 Self::new()
275 }
276}
277
278impl PlannerContext {
279 pub fn new() -> Self {
281 Self {
282 prepare_param_data_types: Arc::new(vec![]),
283 ctes: HashMap::new(),
284 outer_query_schema: None,
285 outer_from_schema: None,
286 create_table_schema: None,
287 }
288 }
289
290 pub fn with_prepare_param_data_types(
292 mut self,
293 prepare_param_data_types: Vec<DataType>,
294 ) -> Self {
295 self.prepare_param_data_types = prepare_param_data_types.into();
296 self
297 }
298
299 pub fn outer_query_schema(&self) -> Option<&DFSchema> {
301 self.outer_query_schema.as_ref().map(|s| s.as_ref())
302 }
303
304 pub fn set_outer_query_schema(
307 &mut self,
308 mut schema: Option<DFSchemaRef>,
309 ) -> Option<DFSchemaRef> {
310 std::mem::swap(&mut self.outer_query_schema, &mut schema);
311 schema
312 }
313
314 pub fn set_table_schema(
315 &mut self,
316 mut schema: Option<DFSchemaRef>,
317 ) -> Option<DFSchemaRef> {
318 std::mem::swap(&mut self.create_table_schema, &mut schema);
319 schema
320 }
321
322 pub fn table_schema(&self) -> Option<DFSchemaRef> {
323 self.create_table_schema.clone()
324 }
325
326 pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
328 self.outer_from_schema.clone()
329 }
330
331 pub fn set_outer_from_schema(
333 &mut self,
334 mut schema: Option<DFSchemaRef>,
335 ) -> Option<DFSchemaRef> {
336 std::mem::swap(&mut self.outer_from_schema, &mut schema);
337 schema
338 }
339
340 pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
342 match self.outer_from_schema.as_mut() {
343 Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
344 None => self.outer_from_schema = Some(Arc::clone(schema)),
345 };
346 Ok(())
347 }
348
349 pub fn prepare_param_data_types(&self) -> &[DataType] {
351 &self.prepare_param_data_types
352 }
353
354 pub fn contains_cte(&self, cte_name: &str) -> bool {
357 self.ctes.contains_key(cte_name)
358 }
359
360 pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
363 let cte_name = cte_name.into();
364 self.ctes.insert(cte_name, Arc::new(plan));
365 }
366
367 pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
370 self.ctes.get(cte_name).map(|cte| cte.as_ref())
371 }
372
373 pub(super) fn remove_cte(&mut self, cte_name: &str) {
375 self.ctes.remove(cte_name);
376 }
377}
378
379pub struct SqlToRel<'a, S: ContextProvider> {
399 pub(crate) context_provider: &'a S,
400 pub(crate) options: ParserOptions,
401 pub(crate) ident_normalizer: IdentNormalizer,
402}
403
404impl<'a, S: ContextProvider> SqlToRel<'a, S> {
405 pub fn new(context_provider: &'a S) -> Self {
409 let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
410 Self::new_with_options(context_provider, parser_options)
411 }
412
413 pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
418 let ident_normalize = options.enable_ident_normalization;
419
420 SqlToRel {
421 context_provider,
422 options,
423 ident_normalizer: IdentNormalizer::new(ident_normalize),
424 }
425 }
426
427 pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
428 let mut fields = Vec::with_capacity(columns.len());
429
430 for column in columns {
431 let data_type = self.convert_data_type(&column.data_type)?;
432 let not_nullable = column
433 .options
434 .iter()
435 .any(|x| x.option == ColumnOption::NotNull);
436 fields.push(Field::new(
437 self.ident_normalizer.normalize(column.name),
438 data_type,
439 !not_nullable,
440 ));
441 }
442
443 Ok(Schema::new(fields))
444 }
445
446 pub(super) fn build_column_defaults(
448 &self,
449 columns: &Vec<SQLColumnDef>,
450 planner_context: &mut PlannerContext,
451 ) -> Result<Vec<(String, Expr)>> {
452 let mut column_defaults = vec![];
453 let empty_schema = DFSchema::empty();
455 let error_desc = |e: DataFusionError| match e {
456 DataFusionError::SchemaError(ref err, _)
457 if matches!(**err, SchemaError::FieldNotFound { .. }) =>
458 {
459 plan_datafusion_err!(
460 "Column reference is not allowed in the DEFAULT expression : {}",
461 e
462 )
463 }
464 _ => e,
465 };
466
467 for column in columns {
468 if let Some(default_sql_expr) =
469 column.options.iter().find_map(|o| match &o.option {
470 ColumnOption::Default(expr) => Some(expr),
471 _ => None,
472 })
473 {
474 let default_expr = self
475 .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
476 .map_err(error_desc)?;
477 column_defaults.push((
478 self.ident_normalizer.normalize(column.name.clone()),
479 default_expr,
480 ));
481 }
482 }
483 Ok(column_defaults)
484 }
485
486 pub(crate) fn apply_table_alias(
488 &self,
489 plan: LogicalPlan,
490 alias: TableAlias,
491 ) -> Result<LogicalPlan> {
492 let idents = alias.columns.into_iter().map(|c| c.name).collect();
493 let plan = self.apply_expr_alias(plan, idents)?;
494
495 LogicalPlanBuilder::from(plan)
496 .alias(TableReference::bare(
497 self.ident_normalizer.normalize(alias.name),
498 ))?
499 .build()
500 }
501
502 pub(crate) fn apply_expr_alias(
503 &self,
504 plan: LogicalPlan,
505 idents: Vec<Ident>,
506 ) -> Result<LogicalPlan> {
507 if idents.is_empty() {
508 Ok(plan)
509 } else if idents.len() != plan.schema().fields().len() {
510 plan_err!(
511 "Source table contains {} columns but only {} \
512 names given as column alias",
513 plan.schema().fields().len(),
514 idents.len()
515 )
516 } else {
517 let fields = plan.schema().fields().clone();
518 LogicalPlanBuilder::from(plan)
519 .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
520 col(field.name()).alias(self.ident_normalizer.normalize(ident))
521 }))?
522 .build()
523 }
524 }
525
526 pub(crate) fn validate_schema_satisfies_exprs(
528 &self,
529 schema: &DFSchema,
530 exprs: &[Expr],
531 ) -> Result<()> {
532 find_column_exprs(exprs)
533 .iter()
534 .try_for_each(|col| match col {
535 Expr::Column(col) => match &col.relation {
536 Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
537 None => {
538 if !schema.fields_with_unqualified_name(&col.name).is_empty() {
539 Ok(())
540 } else {
541 Err(field_not_found(
542 col.relation.clone(),
543 col.name.as_str(),
544 schema,
545 ))
546 }
547 }
548 }
549 .map_err(|err: DataFusionError| match &err {
550 DataFusionError::SchemaError(inner, _)
551 if matches!(
552 inner.as_ref(),
553 SchemaError::FieldNotFound { .. }
554 ) =>
555 {
556 let SchemaError::FieldNotFound {
557 field,
558 valid_fields,
559 } = inner.as_ref()
560 else {
561 unreachable!()
562 };
563 let mut diagnostic = if let Some(relation) = &col.relation {
564 Diagnostic::new_error(
565 format!(
566 "column '{}' not found in '{}'",
567 &col.name, relation
568 ),
569 col.spans().first(),
570 )
571 } else {
572 Diagnostic::new_error(
573 format!("column '{}' not found", &col.name),
574 col.spans().first(),
575 )
576 };
577 add_possible_columns_to_diag(
578 &mut diagnostic,
579 field,
580 valid_fields,
581 );
582 err.with_diagnostic(diagnostic)
583 }
584 _ => err,
585 }),
586 _ => internal_err!("Not a column"),
587 })
588 }
589
590 pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
591 if let Some(type_planner) = self.context_provider.get_type_planner() {
593 if let Some(data_type) = type_planner.plan_type(sql_type)? {
594 return Ok(data_type);
595 }
596 }
597
598 match sql_type {
600 SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
601 let inner_data_type = self.convert_data_type(inner_sql_type)?;
603 Ok(DataType::new_list(inner_data_type, true))
604 }
605 SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
606 inner_sql_type,
607 maybe_array_size,
608 )) => {
609 let inner_data_type = self.convert_data_type(inner_sql_type)?;
610 if let Some(array_size) = maybe_array_size {
611 Ok(DataType::new_fixed_size_list(
612 inner_data_type,
613 *array_size as i32,
614 true,
615 ))
616 } else {
617 Ok(DataType::new_list(inner_data_type, true))
618 }
619 }
620 SQLDataType::Array(ArrayElemTypeDef::None) => {
621 not_impl_err!("Arrays with unspecified type is not supported")
622 }
623 other => self.convert_simple_data_type(other),
624 }
625 }
626
627 fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
628 match sql_type {
629 SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
630 SQLDataType::TinyInt(_) => Ok(DataType::Int8),
631 SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
632 SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
633 Ok(DataType::Int32)
634 }
635 SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
636 SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
637 SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
638 Ok(DataType::UInt16)
639 }
640 SQLDataType::IntUnsigned(_)
641 | SQLDataType::IntegerUnsigned(_)
642 | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
643 SQLDataType::Varchar(length) => {
644 match (length, self.options.support_varchar_with_length) {
645 (Some(_), false) => plan_err!(
646 "does not support Varchar with length, \
647 please set `support_varchar_with_length` to be true"
648 ),
649 _ => {
650 if self.options.map_string_types_to_utf8view {
651 Ok(DataType::Utf8View)
652 } else {
653 Ok(DataType::Utf8)
654 }
655 }
656 }
657 }
658 SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
659 Ok(DataType::UInt64)
660 }
661 SQLDataType::Float(_) => Ok(DataType::Float32),
662 SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
663 SQLDataType::Double(ExactNumberInfo::None)
664 | SQLDataType::DoublePrecision
665 | SQLDataType::Float8 => Ok(DataType::Float64),
666 SQLDataType::Double(
667 ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
668 ) => {
669 not_impl_err!(
670 "Unsupported SQL type (precision/scale not supported) {sql_type}"
671 )
672 }
673 SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
674 if self.options.map_string_types_to_utf8view {
675 Ok(DataType::Utf8View)
676 } else {
677 Ok(DataType::Utf8)
678 }
679 }
680 SQLDataType::Timestamp(precision, tz_info)
681 if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
682 {
683 let tz = if matches!(tz_info, TimezoneInfo::Tz)
684 || matches!(tz_info, TimezoneInfo::WithTimeZone)
685 {
686 Some(self.context_provider.options().execution.time_zone.clone())
690 } else {
691 None
693 };
694 let precision = match precision {
695 Some(0) => TimeUnit::Second,
696 Some(3) => TimeUnit::Millisecond,
697 Some(6) => TimeUnit::Microsecond,
698 None | Some(9) => TimeUnit::Nanosecond,
699 _ => unreachable!(),
700 };
701 Ok(DataType::Timestamp(precision, tz.map(Into::into)))
702 }
703 SQLDataType::Date => Ok(DataType::Date32),
704 SQLDataType::Time(None, tz_info) => {
705 if matches!(tz_info, TimezoneInfo::None)
706 || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
707 {
708 Ok(DataType::Time64(TimeUnit::Nanosecond))
709 } else {
710 not_impl_err!("Unsupported SQL type {sql_type:?}")
712 }
713 }
714 SQLDataType::Numeric(exact_number_info)
715 | SQLDataType::Decimal(exact_number_info) => {
716 let (precision, scale) = match *exact_number_info {
717 ExactNumberInfo::None => (None, None),
718 ExactNumberInfo::Precision(precision) => (Some(precision), None),
719 ExactNumberInfo::PrecisionAndScale(precision, scale) => {
720 (Some(precision), Some(scale))
721 }
722 };
723 make_decimal_type(precision, scale)
724 }
725 SQLDataType::Bytea => Ok(DataType::Binary),
726 SQLDataType::Interval => Ok(DataType::Interval(IntervalUnit::MonthDayNano)),
727 SQLDataType::Struct(fields, _) => {
728 let fields = fields
729 .iter()
730 .enumerate()
731 .map(|(idx, field)| {
732 let data_type = self.convert_data_type(&field.field_type)?;
733 let field_name = match &field.field_name {
734 Some(ident) => ident.clone(),
735 None => Ident::new(format!("c{idx}")),
736 };
737 Ok(Arc::new(Field::new(
738 self.ident_normalizer.normalize(field_name),
739 data_type,
740 true,
741 )))
742 })
743 .collect::<Result<Vec<_>>>()?;
744 Ok(DataType::Struct(Fields::from(fields)))
745 }
746 SQLDataType::Nvarchar(_)
747 | SQLDataType::JSON
748 | SQLDataType::Uuid
749 | SQLDataType::Binary(_)
750 | SQLDataType::Varbinary(_)
751 | SQLDataType::Blob(_)
752 | SQLDataType::Datetime(_)
753 | SQLDataType::Regclass
754 | SQLDataType::Custom(_, _)
755 | SQLDataType::Array(_)
756 | SQLDataType::Enum(_, _)
757 | SQLDataType::Set(_)
758 | SQLDataType::MediumInt(_)
759 | SQLDataType::MediumIntUnsigned(_)
760 | SQLDataType::Character(_)
761 | SQLDataType::CharacterVarying(_)
762 | SQLDataType::CharVarying(_)
763 | SQLDataType::CharacterLargeObject(_)
764 | SQLDataType::CharLargeObject(_)
765 | SQLDataType::Timestamp(_, _)
766 | SQLDataType::Time(Some(_), _)
767 | SQLDataType::Dec(_)
768 | SQLDataType::BigNumeric(_)
769 | SQLDataType::BigDecimal(_)
770 | SQLDataType::Clob(_)
771 | SQLDataType::Bytes(_)
772 | SQLDataType::Int64
773 | SQLDataType::Float64
774 | SQLDataType::JSONB
775 | SQLDataType::Unspecified
776 | SQLDataType::Int16
777 | SQLDataType::Int32
778 | SQLDataType::Int128
779 | SQLDataType::Int256
780 | SQLDataType::UInt8
781 | SQLDataType::UInt16
782 | SQLDataType::UInt32
783 | SQLDataType::UInt64
784 | SQLDataType::UInt128
785 | SQLDataType::UInt256
786 | SQLDataType::Float32
787 | SQLDataType::Date32
788 | SQLDataType::Datetime64(_, _)
789 | SQLDataType::FixedString(_)
790 | SQLDataType::Map(_, _)
791 | SQLDataType::Tuple(_)
792 | SQLDataType::Nested(_)
793 | SQLDataType::Union(_)
794 | SQLDataType::Nullable(_)
795 | SQLDataType::LowCardinality(_)
796 | SQLDataType::Trigger
797 | SQLDataType::TinyBlob
798 | SQLDataType::MediumBlob
799 | SQLDataType::LongBlob
800 | SQLDataType::TinyText
801 | SQLDataType::MediumText
802 | SQLDataType::LongText
803 | SQLDataType::Bit(_)
804 | SQLDataType::BitVarying(_)
805 | SQLDataType::Signed
806 | SQLDataType::SignedInteger
807 | SQLDataType::Unsigned
808 | SQLDataType::UnsignedInteger
809 | SQLDataType::AnyType
810 | SQLDataType::Table(_)
811 | SQLDataType::VarBit(_)
812 | SQLDataType::UTinyInt
813 | SQLDataType::USmallInt
814 | SQLDataType::HugeInt
815 | SQLDataType::UHugeInt
816 | SQLDataType::UBigInt
817 | SQLDataType::TimestampNtz
818 | SQLDataType::NamedTable { .. }
819 | SQLDataType::TsVector
820 | SQLDataType::TsQuery
821 | SQLDataType::GeometricType(_) => {
822 not_impl_err!("Unsupported SQL type {sql_type:?}")
823 }
824 }
825 }
826
827 pub(crate) fn object_name_to_table_reference(
828 &self,
829 object_name: ObjectName,
830 ) -> Result<TableReference> {
831 object_name_to_table_reference(
832 object_name,
833 self.options.enable_ident_normalization,
834 )
835 }
836}
837
838pub fn object_name_to_table_reference(
849 object_name: ObjectName,
850 enable_normalization: bool,
851) -> Result<TableReference> {
852 let ObjectName(object_name_parts) = object_name;
854 let idents = object_name_parts
855 .into_iter()
856 .map(|object_name_part| {
857 object_name_part.as_ident().cloned().ok_or_else(|| {
858 plan_datafusion_err!(
859 "Expected identifier, but found: {:?}",
860 object_name_part
861 )
862 })
863 })
864 .collect::<Result<Vec<_>>>()?;
865 idents_to_table_reference(idents, enable_normalization)
866}
867
868struct IdentTaker {
869 normalizer: IdentNormalizer,
870 idents: Vec<Ident>,
871}
872
873impl IdentTaker {
876 fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
877 Self {
878 normalizer: IdentNormalizer::new(enable_normalization),
879 idents,
880 }
881 }
882
883 fn take(&mut self) -> String {
884 let ident = self.idents.pop().expect("no more identifiers");
885 self.normalizer.normalize(ident)
886 }
887
888 fn len(&self) -> usize {
890 self.idents.len()
891 }
892}
893
894impl std::fmt::Display for IdentTaker {
896 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
897 let mut first = true;
898 for ident in self.idents.iter() {
899 if !first {
900 write!(f, ".")?;
901 }
902 write!(f, "{ident}")?;
903 first = false;
904 }
905
906 Ok(())
907 }
908}
909
910pub(crate) fn idents_to_table_reference(
912 idents: Vec<Ident>,
913 enable_normalization: bool,
914) -> Result<TableReference> {
915 let mut taker = IdentTaker::new(idents, enable_normalization);
916
917 match taker.len() {
918 1 => {
919 let table = taker.take();
920 Ok(TableReference::bare(table))
921 }
922 2 => {
923 let table = taker.take();
924 let schema = taker.take();
925 Ok(TableReference::partial(schema, table))
926 }
927 3 => {
928 let table = taker.take();
929 let schema = taker.take();
930 let catalog = taker.take();
931 Ok(TableReference::full(catalog, schema, table))
932 }
933 _ => plan_err!(
934 "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
935 taker,
936 taker.len()
937 ),
938 }
939}
940
941pub fn object_name_to_qualifier(
944 sql_table_name: &ObjectName,
945 enable_normalization: bool,
946) -> Result<String> {
947 let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
948 let normalizer = IdentNormalizer::new(enable_normalization);
949 sql_table_name
950 .0
951 .iter()
952 .rev()
953 .zip(columns)
954 .map(|(object_name_part, column_name)| {
955 object_name_part
956 .as_ident()
957 .map(|ident| {
958 format!(
959 r#"{} = '{}'"#,
960 column_name,
961 normalizer.normalize(ident.clone())
962 )
963 })
964 .ok_or_else(|| {
965 plan_datafusion_err!(
966 "Expected identifier, but found: {:?}",
967 object_name_part
968 )
969 })
970 })
971 .collect::<Result<Vec<_>>>()
972 .map(|parts| parts.join(" AND "))
973}