1use std::collections::HashMap;
20use std::sync::Arc;
21use std::vec;
22
23use arrow::datatypes::*;
24use datafusion_common::config::SqlParserOptions;
25use datafusion_common::error::add_possible_columns_to_diag;
26use datafusion_common::TableReference;
27use datafusion_common::{
28 field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic,
29 SchemaError,
30};
31use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
32use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
33use datafusion_expr::utils::find_column_exprs;
34use datafusion_expr::{col, Expr};
35use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
36use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
37use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
38
39use crate::utils::make_decimal_type;
40pub use datafusion_expr::planner::ContextProvider;
41
42#[derive(Debug, Clone, Copy)]
44pub struct ParserOptions {
45 pub parse_float_as_decimal: bool,
47 pub enable_ident_normalization: bool,
49 pub support_varchar_with_length: bool,
51 pub enable_options_value_normalization: bool,
53 pub collect_spans: bool,
55 pub map_string_types_to_utf8view: bool,
57}
58
59impl ParserOptions {
60 pub fn new() -> Self {
71 Self {
72 parse_float_as_decimal: false,
73 enable_ident_normalization: true,
74 support_varchar_with_length: true,
75 map_string_types_to_utf8view: true,
76 enable_options_value_normalization: false,
77 collect_spans: false,
78 }
79 }
80
81 pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
91 self.parse_float_as_decimal = value;
92 self
93 }
94
95 pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
105 self.enable_ident_normalization = value;
106 self
107 }
108
109 pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
111 self.support_varchar_with_length = value;
112 self
113 }
114
115 pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
117 self.map_string_types_to_utf8view = value;
118 self
119 }
120
121 pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
123 self.enable_options_value_normalization = value;
124 self
125 }
126
127 pub fn with_collect_spans(mut self, value: bool) -> Self {
129 self.collect_spans = value;
130 self
131 }
132}
133
134impl Default for ParserOptions {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl From<&SqlParserOptions> for ParserOptions {
141 fn from(options: &SqlParserOptions) -> Self {
142 Self {
143 parse_float_as_decimal: options.parse_float_as_decimal,
144 enable_ident_normalization: options.enable_ident_normalization,
145 support_varchar_with_length: options.support_varchar_with_length,
146 map_string_types_to_utf8view: options.map_string_types_to_utf8view,
147 enable_options_value_normalization: options
148 .enable_options_value_normalization,
149 collect_spans: options.collect_spans,
150 }
151 }
152}
153
154#[derive(Debug)]
156pub struct IdentNormalizer {
157 normalize: bool,
158}
159
160impl Default for IdentNormalizer {
161 fn default() -> Self {
162 Self { normalize: true }
163 }
164}
165
166impl IdentNormalizer {
167 pub fn new(normalize: bool) -> Self {
168 Self { normalize }
169 }
170
171 pub fn normalize(&self, ident: Ident) -> String {
172 if self.normalize {
173 crate::utils::normalize_ident(ident)
174 } else {
175 ident.value
176 }
177 }
178}
179
180#[derive(Debug, Clone)]
194pub struct PlannerContext {
195 prepare_param_data_types: Arc<Vec<DataType>>,
198 ctes: HashMap<String, Arc<LogicalPlan>>,
201 outer_query_schema: Option<DFSchemaRef>,
203 outer_from_schema: Option<DFSchemaRef>,
206 create_table_schema: Option<DFSchemaRef>,
208}
209
210impl Default for PlannerContext {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216impl PlannerContext {
217 pub fn new() -> Self {
219 Self {
220 prepare_param_data_types: Arc::new(vec![]),
221 ctes: HashMap::new(),
222 outer_query_schema: None,
223 outer_from_schema: None,
224 create_table_schema: None,
225 }
226 }
227
228 pub fn with_prepare_param_data_types(
230 mut self,
231 prepare_param_data_types: Vec<DataType>,
232 ) -> Self {
233 self.prepare_param_data_types = prepare_param_data_types.into();
234 self
235 }
236
237 pub fn outer_query_schema(&self) -> Option<&DFSchema> {
239 self.outer_query_schema.as_ref().map(|s| s.as_ref())
240 }
241
242 pub fn set_outer_query_schema(
245 &mut self,
246 mut schema: Option<DFSchemaRef>,
247 ) -> Option<DFSchemaRef> {
248 std::mem::swap(&mut self.outer_query_schema, &mut schema);
249 schema
250 }
251
252 pub fn set_table_schema(
253 &mut self,
254 mut schema: Option<DFSchemaRef>,
255 ) -> Option<DFSchemaRef> {
256 std::mem::swap(&mut self.create_table_schema, &mut schema);
257 schema
258 }
259
260 pub fn table_schema(&self) -> Option<DFSchemaRef> {
261 self.create_table_schema.clone()
262 }
263
264 pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
266 self.outer_from_schema.clone()
267 }
268
269 pub fn set_outer_from_schema(
271 &mut self,
272 mut schema: Option<DFSchemaRef>,
273 ) -> Option<DFSchemaRef> {
274 std::mem::swap(&mut self.outer_from_schema, &mut schema);
275 schema
276 }
277
278 pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
280 match self.outer_from_schema.as_mut() {
281 Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
282 None => self.outer_from_schema = Some(Arc::clone(schema)),
283 };
284 Ok(())
285 }
286
287 pub fn prepare_param_data_types(&self) -> &[DataType] {
289 &self.prepare_param_data_types
290 }
291
292 pub fn contains_cte(&self, cte_name: &str) -> bool {
295 self.ctes.contains_key(cte_name)
296 }
297
298 pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
301 let cte_name = cte_name.into();
302 self.ctes.insert(cte_name, Arc::new(plan));
303 }
304
305 pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
308 self.ctes.get(cte_name).map(|cte| cte.as_ref())
309 }
310
311 pub(super) fn remove_cte(&mut self, cte_name: &str) {
313 self.ctes.remove(cte_name);
314 }
315}
316
317pub struct SqlToRel<'a, S: ContextProvider> {
337 pub(crate) context_provider: &'a S,
338 pub(crate) options: ParserOptions,
339 pub(crate) ident_normalizer: IdentNormalizer,
340}
341
342impl<'a, S: ContextProvider> SqlToRel<'a, S> {
343 pub fn new(context_provider: &'a S) -> Self {
347 let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
348 Self::new_with_options(context_provider, parser_options)
349 }
350
351 pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
356 let ident_normalize = options.enable_ident_normalization;
357
358 SqlToRel {
359 context_provider,
360 options,
361 ident_normalizer: IdentNormalizer::new(ident_normalize),
362 }
363 }
364
365 pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
366 let mut fields = Vec::with_capacity(columns.len());
367
368 for column in columns {
369 let data_type = self.convert_data_type(&column.data_type)?;
370 let not_nullable = column
371 .options
372 .iter()
373 .any(|x| x.option == ColumnOption::NotNull);
374 fields.push(Field::new(
375 self.ident_normalizer.normalize(column.name),
376 data_type,
377 !not_nullable,
378 ));
379 }
380
381 Ok(Schema::new(fields))
382 }
383
384 pub(super) fn build_column_defaults(
386 &self,
387 columns: &Vec<SQLColumnDef>,
388 planner_context: &mut PlannerContext,
389 ) -> Result<Vec<(String, Expr)>> {
390 let mut column_defaults = vec![];
391 let empty_schema = DFSchema::empty();
393 let error_desc = |e: DataFusionError| match e {
394 DataFusionError::SchemaError(ref err, _)
395 if matches!(**err, SchemaError::FieldNotFound { .. }) =>
396 {
397 plan_datafusion_err!(
398 "Column reference is not allowed in the DEFAULT expression : {}",
399 e
400 )
401 }
402 _ => e,
403 };
404
405 for column in columns {
406 if let Some(default_sql_expr) =
407 column.options.iter().find_map(|o| match &o.option {
408 ColumnOption::Default(expr) => Some(expr),
409 _ => None,
410 })
411 {
412 let default_expr = self
413 .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
414 .map_err(error_desc)?;
415 column_defaults.push((
416 self.ident_normalizer.normalize(column.name.clone()),
417 default_expr,
418 ));
419 }
420 }
421 Ok(column_defaults)
422 }
423
424 pub(crate) fn apply_table_alias(
426 &self,
427 plan: LogicalPlan,
428 alias: TableAlias,
429 ) -> Result<LogicalPlan> {
430 let idents = alias.columns.into_iter().map(|c| c.name).collect();
431 let plan = self.apply_expr_alias(plan, idents)?;
432
433 LogicalPlanBuilder::from(plan)
434 .alias(TableReference::bare(
435 self.ident_normalizer.normalize(alias.name),
436 ))?
437 .build()
438 }
439
440 pub(crate) fn apply_expr_alias(
441 &self,
442 plan: LogicalPlan,
443 idents: Vec<Ident>,
444 ) -> Result<LogicalPlan> {
445 if idents.is_empty() {
446 Ok(plan)
447 } else if idents.len() != plan.schema().fields().len() {
448 plan_err!(
449 "Source table contains {} columns but only {} \
450 names given as column alias",
451 plan.schema().fields().len(),
452 idents.len()
453 )
454 } else {
455 let fields = plan.schema().fields().clone();
456 LogicalPlanBuilder::from(plan)
457 .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
458 col(field.name()).alias(self.ident_normalizer.normalize(ident))
459 }))?
460 .build()
461 }
462 }
463
464 pub(crate) fn validate_schema_satisfies_exprs(
466 &self,
467 schema: &DFSchema,
468 exprs: &[Expr],
469 ) -> Result<()> {
470 find_column_exprs(exprs)
471 .iter()
472 .try_for_each(|col| match col {
473 Expr::Column(col) => match &col.relation {
474 Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
475 None => {
476 if !schema.fields_with_unqualified_name(&col.name).is_empty() {
477 Ok(())
478 } else {
479 Err(field_not_found(
480 col.relation.clone(),
481 col.name.as_str(),
482 schema,
483 ))
484 }
485 }
486 }
487 .map_err(|err: DataFusionError| match &err {
488 DataFusionError::SchemaError(inner, _)
489 if matches!(
490 inner.as_ref(),
491 SchemaError::FieldNotFound { .. }
492 ) =>
493 {
494 let SchemaError::FieldNotFound {
495 field,
496 valid_fields,
497 } = inner.as_ref()
498 else {
499 unreachable!()
500 };
501 let mut diagnostic = if let Some(relation) = &col.relation {
502 Diagnostic::new_error(
503 format!(
504 "column '{}' not found in '{}'",
505 &col.name, relation
506 ),
507 col.spans().first(),
508 )
509 } else {
510 Diagnostic::new_error(
511 format!("column '{}' not found", &col.name),
512 col.spans().first(),
513 )
514 };
515 add_possible_columns_to_diag(
516 &mut diagnostic,
517 field,
518 valid_fields,
519 );
520 err.with_diagnostic(diagnostic)
521 }
522 _ => err,
523 }),
524 _ => internal_err!("Not a column"),
525 })
526 }
527
528 pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
529 if let Some(type_planner) = self.context_provider.get_type_planner() {
531 if let Some(data_type) = type_planner.plan_type(sql_type)? {
532 return Ok(data_type);
533 }
534 }
535
536 match sql_type {
538 SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
539 let inner_data_type = self.convert_data_type(inner_sql_type)?;
541 Ok(DataType::new_list(inner_data_type, true))
542 }
543 SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
544 inner_sql_type,
545 maybe_array_size,
546 )) => {
547 let inner_data_type = self.convert_data_type(inner_sql_type)?;
548 if let Some(array_size) = maybe_array_size {
549 Ok(DataType::new_fixed_size_list(
550 inner_data_type,
551 *array_size as i32,
552 true,
553 ))
554 } else {
555 Ok(DataType::new_list(inner_data_type, true))
556 }
557 }
558 SQLDataType::Array(ArrayElemTypeDef::None) => {
559 not_impl_err!("Arrays with unspecified type is not supported")
560 }
561 other => self.convert_simple_data_type(other),
562 }
563 }
564
565 fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
566 match sql_type {
567 SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
568 SQLDataType::TinyInt(_) => Ok(DataType::Int8),
569 SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
570 SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
571 Ok(DataType::Int32)
572 }
573 SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
574 SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
575 SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
576 Ok(DataType::UInt16)
577 }
578 SQLDataType::IntUnsigned(_)
579 | SQLDataType::IntegerUnsigned(_)
580 | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
581 SQLDataType::Varchar(length) => {
582 match (length, self.options.support_varchar_with_length) {
583 (Some(_), false) => plan_err!(
584 "does not support Varchar with length, \
585 please set `support_varchar_with_length` to be true"
586 ),
587 _ => {
588 if self.options.map_string_types_to_utf8view {
589 Ok(DataType::Utf8View)
590 } else {
591 Ok(DataType::Utf8)
592 }
593 }
594 }
595 }
596 SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
597 Ok(DataType::UInt64)
598 }
599 SQLDataType::Float(_) => Ok(DataType::Float32),
600 SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
601 SQLDataType::Double(ExactNumberInfo::None)
602 | SQLDataType::DoublePrecision
603 | SQLDataType::Float8 => Ok(DataType::Float64),
604 SQLDataType::Double(
605 ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
606 ) => {
607 not_impl_err!(
608 "Unsupported SQL type (precision/scale not supported) {sql_type}"
609 )
610 }
611 SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
612 if self.options.map_string_types_to_utf8view {
613 Ok(DataType::Utf8View)
614 } else {
615 Ok(DataType::Utf8)
616 }
617 }
618 SQLDataType::Timestamp(precision, tz_info)
619 if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
620 {
621 let tz = if matches!(tz_info, TimezoneInfo::Tz)
622 || matches!(tz_info, TimezoneInfo::WithTimeZone)
623 {
624 Some(self.context_provider.options().execution.time_zone.clone())
628 } else {
629 None
631 };
632 let precision = match precision {
633 Some(0) => TimeUnit::Second,
634 Some(3) => TimeUnit::Millisecond,
635 Some(6) => TimeUnit::Microsecond,
636 None | Some(9) => TimeUnit::Nanosecond,
637 _ => unreachable!(),
638 };
639 Ok(DataType::Timestamp(precision, tz.map(Into::into)))
640 }
641 SQLDataType::Date => Ok(DataType::Date32),
642 SQLDataType::Time(None, tz_info) => {
643 if matches!(tz_info, TimezoneInfo::None)
644 || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
645 {
646 Ok(DataType::Time64(TimeUnit::Nanosecond))
647 } else {
648 not_impl_err!("Unsupported SQL type {sql_type:?}")
650 }
651 }
652 SQLDataType::Numeric(exact_number_info)
653 | SQLDataType::Decimal(exact_number_info) => {
654 let (precision, scale) = match *exact_number_info {
655 ExactNumberInfo::None => (None, None),
656 ExactNumberInfo::Precision(precision) => (Some(precision), None),
657 ExactNumberInfo::PrecisionAndScale(precision, scale) => {
658 (Some(precision), Some(scale))
659 }
660 };
661 make_decimal_type(precision, scale)
662 }
663 SQLDataType::Bytea => Ok(DataType::Binary),
664 SQLDataType::Interval => Ok(DataType::Interval(IntervalUnit::MonthDayNano)),
665 SQLDataType::Struct(fields, _) => {
666 let fields = fields
667 .iter()
668 .enumerate()
669 .map(|(idx, field)| {
670 let data_type = self.convert_data_type(&field.field_type)?;
671 let field_name = match &field.field_name {
672 Some(ident) => ident.clone(),
673 None => Ident::new(format!("c{idx}")),
674 };
675 Ok(Arc::new(Field::new(
676 self.ident_normalizer.normalize(field_name),
677 data_type,
678 true,
679 )))
680 })
681 .collect::<Result<Vec<_>>>()?;
682 Ok(DataType::Struct(Fields::from(fields)))
683 }
684 SQLDataType::Nvarchar(_)
685 | SQLDataType::JSON
686 | SQLDataType::Uuid
687 | SQLDataType::Binary(_)
688 | SQLDataType::Varbinary(_)
689 | SQLDataType::Blob(_)
690 | SQLDataType::Datetime(_)
691 | SQLDataType::Regclass
692 | SQLDataType::Custom(_, _)
693 | SQLDataType::Array(_)
694 | SQLDataType::Enum(_, _)
695 | SQLDataType::Set(_)
696 | SQLDataType::MediumInt(_)
697 | SQLDataType::MediumIntUnsigned(_)
698 | SQLDataType::Character(_)
699 | SQLDataType::CharacterVarying(_)
700 | SQLDataType::CharVarying(_)
701 | SQLDataType::CharacterLargeObject(_)
702 | SQLDataType::CharLargeObject(_)
703 | SQLDataType::Timestamp(_, _)
704 | SQLDataType::Time(Some(_), _)
705 | SQLDataType::Dec(_)
706 | SQLDataType::BigNumeric(_)
707 | SQLDataType::BigDecimal(_)
708 | SQLDataType::Clob(_)
709 | SQLDataType::Bytes(_)
710 | SQLDataType::Int64
711 | SQLDataType::Float64
712 | SQLDataType::JSONB
713 | SQLDataType::Unspecified
714 | SQLDataType::Int16
715 | SQLDataType::Int32
716 | SQLDataType::Int128
717 | SQLDataType::Int256
718 | SQLDataType::UInt8
719 | SQLDataType::UInt16
720 | SQLDataType::UInt32
721 | SQLDataType::UInt64
722 | SQLDataType::UInt128
723 | SQLDataType::UInt256
724 | SQLDataType::Float32
725 | SQLDataType::Date32
726 | SQLDataType::Datetime64(_, _)
727 | SQLDataType::FixedString(_)
728 | SQLDataType::Map(_, _)
729 | SQLDataType::Tuple(_)
730 | SQLDataType::Nested(_)
731 | SQLDataType::Union(_)
732 | SQLDataType::Nullable(_)
733 | SQLDataType::LowCardinality(_)
734 | SQLDataType::Trigger
735 | SQLDataType::TinyBlob
736 | SQLDataType::MediumBlob
737 | SQLDataType::LongBlob
738 | SQLDataType::TinyText
739 | SQLDataType::MediumText
740 | SQLDataType::LongText
741 | SQLDataType::Bit(_)
742 | SQLDataType::BitVarying(_)
743 | SQLDataType::Signed
744 | SQLDataType::SignedInteger
745 | SQLDataType::Unsigned
746 | SQLDataType::UnsignedInteger
747 | SQLDataType::AnyType
748 | SQLDataType::Table(_)
749 | SQLDataType::VarBit(_)
750 | SQLDataType::GeometricType(_) => {
751 not_impl_err!("Unsupported SQL type {sql_type:?}")
752 }
753 }
754 }
755
756 pub(crate) fn object_name_to_table_reference(
757 &self,
758 object_name: ObjectName,
759 ) -> Result<TableReference> {
760 object_name_to_table_reference(
761 object_name,
762 self.options.enable_ident_normalization,
763 )
764 }
765}
766
767pub fn object_name_to_table_reference(
778 object_name: ObjectName,
779 enable_normalization: bool,
780) -> Result<TableReference> {
781 let ObjectName(object_name_parts) = object_name;
783 let idents = object_name_parts
784 .into_iter()
785 .map(|object_name_part| {
786 object_name_part.as_ident().cloned().ok_or_else(|| {
787 plan_datafusion_err!(
788 "Expected identifier, but found: {:?}",
789 object_name_part
790 )
791 })
792 })
793 .collect::<Result<Vec<_>>>()?;
794 idents_to_table_reference(idents, enable_normalization)
795}
796
797struct IdentTaker {
798 normalizer: IdentNormalizer,
799 idents: Vec<Ident>,
800}
801
802impl IdentTaker {
805 fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
806 Self {
807 normalizer: IdentNormalizer::new(enable_normalization),
808 idents,
809 }
810 }
811
812 fn take(&mut self) -> String {
813 let ident = self.idents.pop().expect("no more identifiers");
814 self.normalizer.normalize(ident)
815 }
816
817 fn len(&self) -> usize {
819 self.idents.len()
820 }
821}
822
823impl std::fmt::Display for IdentTaker {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 let mut first = true;
827 for ident in self.idents.iter() {
828 if !first {
829 write!(f, ".")?;
830 }
831 write!(f, "{ident}")?;
832 first = false;
833 }
834
835 Ok(())
836 }
837}
838
839pub(crate) fn idents_to_table_reference(
841 idents: Vec<Ident>,
842 enable_normalization: bool,
843) -> Result<TableReference> {
844 let mut taker = IdentTaker::new(idents, enable_normalization);
845
846 match taker.len() {
847 1 => {
848 let table = taker.take();
849 Ok(TableReference::bare(table))
850 }
851 2 => {
852 let table = taker.take();
853 let schema = taker.take();
854 Ok(TableReference::partial(schema, table))
855 }
856 3 => {
857 let table = taker.take();
858 let schema = taker.take();
859 let catalog = taker.take();
860 Ok(TableReference::full(catalog, schema, table))
861 }
862 _ => plan_err!(
863 "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
864 taker,
865 taker.len()
866 ),
867 }
868}
869
870pub fn object_name_to_qualifier(
873 sql_table_name: &ObjectName,
874 enable_normalization: bool,
875) -> Result<String> {
876 let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
877 let normalizer = IdentNormalizer::new(enable_normalization);
878 sql_table_name
879 .0
880 .iter()
881 .rev()
882 .zip(columns)
883 .map(|(object_name_part, column_name)| {
884 object_name_part
885 .as_ident()
886 .map(|ident| {
887 format!(
888 r#"{} = '{}'"#,
889 column_name,
890 normalizer.normalize(ident.clone())
891 )
892 })
893 .ok_or_else(|| {
894 plan_datafusion_err!(
895 "Expected identifier, but found: {:?}",
896 object_name_part
897 )
898 })
899 })
900 .collect::<Result<Vec<_>>>()
901 .map(|parts| parts.join(" AND "))
902}