Skip to main content

datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{_plan_err, _schema_err, DataFusionError, Result};
27use crate::{
28    Column, FunctionalDependencies, SchemaError, TableReference, field_not_found,
29    unqualified_field_not_found,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and add a relation (table) name.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///]
51/// # See Also
52/// * [DFSchemaRef], an alias to `Arc<DFSchema>`
53/// * [DataTypeExt], common methods for working with Arrow [DataType]s
54/// * [FieldExt], extension methods for working with Arrow [Field]s
55///
56/// [DataTypeExt]: crate::datatype::DataTypeExt
57/// [FieldExt]: crate::datatype::FieldExt
58///
59/// # Creating qualified schemas
60///
61/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
62/// an Arrow schema.
63///
64/// ```rust
65/// use arrow::datatypes::{DataType, Field, Schema};
66/// use datafusion_common::{Column, DFSchema};
67///
68/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
69///
70/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
71/// let column = Column::from_qualified_name("t1.c1");
72/// assert!(df_schema.has_column(&column));
73///
74/// // Can also access qualified fields with unqualified name, if it's unambiguous
75/// let column = Column::from_qualified_name("c1");
76/// assert!(df_schema.has_column(&column));
77/// ```
78///
79/// # Creating unqualified schemas
80///
81/// Create an unqualified schema using TryFrom:
82///
83/// ```rust
84/// use arrow::datatypes::{DataType, Field, Schema};
85/// use datafusion_common::{Column, DFSchema};
86///
87/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
88///
89/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
90/// let column = Column::new_unqualified("c1");
91/// assert!(df_schema.has_column(&column));
92/// ```
93///
94/// # Converting back to Arrow schema
95///
96/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
97///
98/// ```rust
99/// use arrow::datatypes::{Field, Schema};
100/// use datafusion_common::DFSchema;
101/// use std::collections::HashMap;
102///
103/// let df_schema = DFSchema::from_unqualified_fields(
104///     vec![Field::new("c1", arrow::datatypes::DataType::Int32, false)].into(),
105///     HashMap::new(),
106/// )
107/// .unwrap();
108/// let schema: &Schema = df_schema.as_arrow();
109/// assert_eq!(schema.fields().len(), 1);
110/// ```
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct DFSchema {
113    /// Inner Arrow schema reference.
114    inner: SchemaRef,
115    /// Optional qualifiers for each column in this schema. In the same order as
116    /// the `self.inner.fields()`
117    field_qualifiers: Vec<Option<TableReference>>,
118    /// Stores functional dependencies in the schema.
119    functional_dependencies: FunctionalDependencies,
120}
121
122impl DFSchema {
123    /// Creates an empty `DFSchema`
124    pub fn empty() -> Self {
125        Self {
126            inner: Arc::new(Schema::new([])),
127            field_qualifiers: vec![],
128            functional_dependencies: FunctionalDependencies::empty(),
129        }
130    }
131
132    /// Return a reference to the inner Arrow [`Schema`]
133    ///
134    /// Note this does not have the qualifier information
135    pub fn as_arrow(&self) -> &Schema {
136        self.inner.as_ref()
137    }
138
139    /// Return a reference to the inner Arrow [`SchemaRef`]
140    ///
141    /// Note this does not have the qualifier information
142    pub fn inner(&self) -> &SchemaRef {
143        &self.inner
144    }
145
146    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
147    pub fn new_with_metadata(
148        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
149        metadata: HashMap<String, String>,
150    ) -> Result<Self> {
151        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
152            qualified_fields.into_iter().unzip();
153
154        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
155
156        let dfschema = Self {
157            inner: schema,
158            field_qualifiers: qualifiers,
159            functional_dependencies: FunctionalDependencies::empty(),
160        };
161        dfschema.check_names()?;
162        Ok(dfschema)
163    }
164
165    /// Create a new `DFSchema` from a list of Arrow [Field]s
166    pub fn from_unqualified_fields(
167        fields: Fields,
168        metadata: HashMap<String, String>,
169    ) -> Result<Self> {
170        let field_count = fields.len();
171        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
172        let dfschema = Self {
173            inner: schema,
174            field_qualifiers: vec![None; field_count],
175            functional_dependencies: FunctionalDependencies::empty(),
176        };
177        dfschema.check_names()?;
178        Ok(dfschema)
179    }
180
181    /// Create a `DFSchema` from an Arrow schema and a given qualifier
182    ///
183    /// To create a schema from an Arrow schema without a qualifier, use
184    /// `DFSchema::try_from`.
185    pub fn try_from_qualified_schema(
186        qualifier: impl Into<TableReference>,
187        schema: &Schema,
188    ) -> Result<Self> {
189        let qualifier = qualifier.into();
190        let schema = DFSchema {
191            inner: schema.clone().into(),
192            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
193            functional_dependencies: FunctionalDependencies::empty(),
194        };
195        schema.check_names()?;
196        Ok(schema)
197    }
198
199    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
200    pub fn from_field_specific_qualified_schema(
201        qualifiers: Vec<Option<TableReference>>,
202        schema: &SchemaRef,
203    ) -> Result<Self> {
204        let dfschema = Self {
205            inner: Arc::clone(schema),
206            field_qualifiers: qualifiers,
207            functional_dependencies: FunctionalDependencies::empty(),
208        };
209        dfschema.check_names()?;
210        Ok(dfschema)
211    }
212
213    /// Return the same schema, where all fields have a given qualifier.
214    pub fn with_field_specific_qualified_schema(
215        &self,
216        qualifiers: Vec<Option<TableReference>>,
217    ) -> Result<Self> {
218        if qualifiers.len() != self.fields().len() {
219            return _plan_err!(
220                "Number of qualifiers must match number of fields. Expected {}, got {}",
221                self.fields().len(),
222                qualifiers.len()
223            );
224        }
225        Ok(DFSchema {
226            inner: Arc::clone(&self.inner),
227            field_qualifiers: qualifiers,
228            functional_dependencies: self.functional_dependencies.clone(),
229        })
230    }
231
232    /// Check if the schema have some fields with the same name
233    pub fn check_names(&self) -> Result<()> {
234        let mut qualified_names = BTreeSet::new();
235        let mut unqualified_names = BTreeSet::new();
236
237        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
238            if let Some(qualifier) = qualifier {
239                if !qualified_names.insert((qualifier, field.name())) {
240                    return _schema_err!(SchemaError::DuplicateQualifiedField {
241                        qualifier: Box::new(qualifier.clone()),
242                        name: field.name().to_string(),
243                    });
244                }
245            } else if !unqualified_names.insert(field.name()) {
246                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
247                    name: field.name().to_string()
248                });
249            }
250        }
251
252        for (qualifier, name) in qualified_names {
253            if unqualified_names.contains(name) {
254                return _schema_err!(SchemaError::AmbiguousReference {
255                    field: Box::new(Column::new(Some(qualifier.clone()), name))
256                });
257            }
258        }
259        Ok(())
260    }
261
262    /// Assigns functional dependencies.
263    pub fn with_functional_dependencies(
264        mut self,
265        functional_dependencies: FunctionalDependencies,
266    ) -> Result<Self> {
267        if functional_dependencies.is_valid(self.inner.fields.len()) {
268            self.functional_dependencies = functional_dependencies;
269            Ok(self)
270        } else {
271            _plan_err!(
272                "Invalid functional dependency: {:?}",
273                functional_dependencies
274            )
275        }
276    }
277
278    /// Create a new schema that contains the fields from this schema followed by the fields
279    /// from the supplied schema. An error will be returned if there are duplicate field names.
280    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
281        let mut schema_builder = SchemaBuilder::new();
282        schema_builder.extend(self.inner.fields().iter().cloned());
283        schema_builder.extend(schema.fields().iter().cloned());
284        let new_schema = schema_builder.finish();
285
286        let mut new_metadata = self.inner.metadata.clone();
287        new_metadata.extend(schema.inner.metadata.clone());
288        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
289
290        let mut new_qualifiers = self.field_qualifiers.clone();
291        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
292
293        let new_self = Self {
294            inner: Arc::new(new_schema_with_metadata),
295            field_qualifiers: new_qualifiers,
296            functional_dependencies: FunctionalDependencies::empty(),
297        };
298        new_self.check_names()?;
299        Ok(new_self)
300    }
301
302    /// Modify this schema by appending the fields from the supplied schema, ignoring any
303    /// duplicate fields.
304    ///
305    /// ## Merge Precedence
306    ///
307    /// **Schema-level metadata**: Metadata from both schemas is merged.
308    /// If both schemas have the same metadata key, the value from the `other_schema` parameter takes precedence.
309    ///
310    /// **Field-level merging**: Only non-duplicate fields are added. This means that the
311    /// `self` fields will always take precedence over the `other_schema` fields.
312    /// Duplicate field detection is based on:
313    /// - For qualified fields: both qualifier and field name must match
314    /// - For unqualified fields: only field name needs to match
315    ///
316    /// Take note how the precedence for fields & metadata merging differs;
317    /// merging prefers fields from `self` but prefers metadata from `other_schema`.
318    pub fn merge(&mut self, other_schema: &DFSchema) {
319        if other_schema.inner.fields.is_empty() {
320            return;
321        }
322
323        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
324            self.iter().collect();
325        let self_unqualified_names: HashSet<&str> = self
326            .inner
327            .fields
328            .iter()
329            .map(|field| field.name().as_str())
330            .collect();
331
332        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
333        let mut qualifiers = Vec::new();
334        for (qualifier, field) in other_schema.iter() {
335            // skip duplicate columns
336            let duplicated_field = match qualifier {
337                Some(q) => self_fields.contains(&(Some(q), field)),
338                // for unqualified columns, check as unqualified name
339                None => self_unqualified_names.contains(field.name().as_str()),
340            };
341            if !duplicated_field {
342                schema_builder.push(Arc::clone(field));
343                qualifiers.push(qualifier.cloned());
344            }
345        }
346        let mut metadata = self.inner.metadata.clone();
347        metadata.extend(other_schema.inner.metadata.clone());
348
349        let finished = schema_builder.finish();
350        let finished_with_metadata = finished.with_metadata(metadata);
351        self.inner = finished_with_metadata.into();
352        self.field_qualifiers.extend(qualifiers);
353    }
354
355    /// Get a list of fields for this schema
356    pub fn fields(&self) -> &Fields {
357        &self.inner.fields
358    }
359
360    /// Returns a reference to [`FieldRef`] for a column at specific index
361    /// within the schema.
362    ///
363    /// See also [Self::qualified_field] to get both qualifier and field
364    pub fn field(&self, i: usize) -> &FieldRef {
365        &self.inner.fields[i]
366    }
367
368    /// Returns the qualifier (if any) and [`FieldRef`] for a column at specific
369    /// index within the schema.
370    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &FieldRef) {
371        (self.field_qualifiers[i].as_ref(), self.field(i))
372    }
373
374    pub fn index_of_column_by_name(
375        &self,
376        qualifier: Option<&TableReference>,
377        name: &str,
378    ) -> Option<usize> {
379        let mut matches = self
380            .iter()
381            .enumerate()
382            .filter(|(_, (q, f))| match (qualifier, q) {
383                // field to lookup is qualified.
384                // current field is qualified and not shared between relations, compare both
385                // qualifier and name.
386                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
387                // field to lookup is qualified but current field is unqualified.
388                (Some(_), None) => false,
389                // field to lookup is unqualified, no need to compare qualifier
390                (None, Some(_)) | (None, None) => f.name() == name,
391            })
392            .map(|(idx, _)| idx);
393        matches.next()
394    }
395
396    /// Find the index of the column with the given qualifier and name,
397    /// returning `None` if not found
398    ///
399    /// See [Self::index_of_column] for a version that returns an error if the
400    /// column is not found
401    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
402        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
403    }
404
405    /// Find the index of the column with the given qualifier and name,
406    /// returning `Err` if not found
407    ///
408    /// See [Self::maybe_index_of_column] for a version that returns `None` if
409    /// the column is not found
410    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
411        self.maybe_index_of_column(col)
412            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
413    }
414
415    /// Check if the column is in the current schema
416    pub fn is_column_from_schema(&self, col: &Column) -> bool {
417        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
418            .is_some()
419    }
420
421    /// Find the [`FieldRef`] with the given name and optional qualifier
422    pub fn field_with_name(
423        &self,
424        qualifier: Option<&TableReference>,
425        name: &str,
426    ) -> Result<&FieldRef> {
427        if let Some(qualifier) = qualifier {
428            self.field_with_qualified_name(qualifier, name)
429        } else {
430            self.field_with_unqualified_name(name)
431        }
432    }
433
434    /// Find the qualified field with the given name
435    pub fn qualified_field_with_name(
436        &self,
437        qualifier: Option<&TableReference>,
438        name: &str,
439    ) -> Result<(Option<&TableReference>, &FieldRef)> {
440        if let Some(qualifier) = qualifier {
441            let idx = self
442                .index_of_column_by_name(Some(qualifier), name)
443                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
444            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
445        } else {
446            self.qualified_field_with_unqualified_name(name)
447        }
448    }
449
450    /// Find all fields having the given qualifier
451    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&FieldRef> {
452        self.iter()
453            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
454            .map(|(_, f)| f)
455            .collect()
456    }
457
458    /// Find all fields indices having the given qualifier
459    pub fn fields_indices_with_qualified(
460        &self,
461        qualifier: &TableReference,
462    ) -> Vec<usize> {
463        self.iter()
464            .enumerate()
465            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
466            .collect()
467    }
468
469    /// Find all fields that match the given name
470    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&FieldRef> {
471        self.fields()
472            .iter()
473            .filter(|field| field.name() == name)
474            .collect()
475    }
476
477    /// Find all fields that match the given name and return them with their qualifier
478    pub fn qualified_fields_with_unqualified_name(
479        &self,
480        name: &str,
481    ) -> Vec<(Option<&TableReference>, &FieldRef)> {
482        self.iter()
483            .filter(|(_, field)| field.name() == name)
484            .collect()
485    }
486
487    /// Find all fields that match the given name and convert to column
488    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
489        self.iter()
490            .filter(|(_, field)| field.name() == name)
491            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
492            .collect()
493    }
494
495    /// Return all `Column`s for the schema
496    pub fn columns(&self) -> Vec<Column> {
497        self.iter()
498            .map(|(qualifier, field)| {
499                Column::new(qualifier.cloned(), field.name().clone())
500            })
501            .collect()
502    }
503
504    /// Find the qualified field with the given unqualified name
505    pub fn qualified_field_with_unqualified_name(
506        &self,
507        name: &str,
508    ) -> Result<(Option<&TableReference>, &FieldRef)> {
509        let matches = self.qualified_fields_with_unqualified_name(name);
510        match matches.len() {
511            0 => Err(unqualified_field_not_found(name, self)),
512            1 => Ok((matches[0].0, matches[0].1)),
513            _ => {
514                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
515                // Because name may generate from Alias/... . It means that it don't own qualifier.
516                // For example:
517                //             Join on id = b.id
518                // Project a.id as id   TableScan b id
519                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
520                // one field without qualifier, we should return it.
521                let fields_without_qualifier = matches
522                    .iter()
523                    .filter(|(q, _)| q.is_none())
524                    .collect::<Vec<_>>();
525                if fields_without_qualifier.len() == 1 {
526                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
527                } else {
528                    _schema_err!(SchemaError::AmbiguousReference {
529                        field: Box::new(Column::new_unqualified(name.to_string()))
530                    })
531                }
532            }
533        }
534    }
535
536    /// Find the field with the given name
537    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&FieldRef> {
538        self.qualified_field_with_unqualified_name(name)
539            .map(|(_, field)| field)
540    }
541
542    /// Find the field with the given qualified name
543    pub fn field_with_qualified_name(
544        &self,
545        qualifier: &TableReference,
546        name: &str,
547    ) -> Result<&FieldRef> {
548        let idx = self
549            .index_of_column_by_name(Some(qualifier), name)
550            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
551
552        Ok(self.field(idx))
553    }
554
555    /// Find the field with the given qualified column
556    pub fn qualified_field_from_column(
557        &self,
558        column: &Column,
559    ) -> Result<(Option<&TableReference>, &FieldRef)> {
560        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
561    }
562
563    /// Find if the field exists with the given name
564    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
565        self.fields().iter().any(|field| field.name() == name)
566    }
567
568    /// Find if the field exists with the given qualified name
569    pub fn has_column_with_qualified_name(
570        &self,
571        qualifier: &TableReference,
572        name: &str,
573    ) -> bool {
574        self.iter()
575            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
576    }
577
578    /// Find if the field exists with the given qualified column
579    pub fn has_column(&self, column: &Column) -> bool {
580        match &column.relation {
581            Some(r) => self.has_column_with_qualified_name(r, &column.name),
582            None => self.has_column_with_unqualified_name(&column.name),
583        }
584    }
585
586    /// Check to see if unqualified field names matches field names in Arrow schema
587    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
588        self.inner
589            .fields
590            .iter()
591            .zip(arrow_schema.fields().iter())
592            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
593    }
594
595    /// Check to see if fields in 2 Arrow schemas are compatible
596    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
597    pub fn check_arrow_schema_type_compatible(
598        &self,
599        arrow_schema: &Schema,
600    ) -> Result<()> {
601        let self_arrow_schema = self.as_arrow();
602        self_arrow_schema
603            .fields()
604            .iter()
605            .zip(arrow_schema.fields().iter())
606            .try_for_each(|(l_field, r_field)| {
607                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
608                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
609                                r_field.name(),
610                                r_field.data_type(),
611                                l_field.name(),
612                                l_field.data_type())
613                } else {
614                    Ok(())
615                }
616            })
617    }
618
619    /// Returns true if the two schemas have the same qualified named
620    /// fields with logically equivalent data types. Returns false otherwise.
621    ///
622    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
623    /// equivalence checking.
624    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
625        if self.fields().len() != other.fields().len() {
626            return false;
627        }
628        let self_fields = self.iter();
629        let other_fields = other.iter();
630        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
631            q1 == q2
632                && f1.name() == f2.name()
633                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
634        })
635    }
636
637    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
638    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
639        self.has_equivalent_names_and_types(other).is_ok()
640    }
641
642    /// Returns Ok if the two schemas have the same qualified named
643    /// fields with the compatible data types.
644    ///
645    /// Returns an `Err` with a message otherwise.
646    ///
647    /// This is a specialized version of Eq that ignores differences in
648    /// nullability and metadata.
649    ///
650    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
651    /// logical type checking, which for example would consider a dictionary
652    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
653    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
654        // case 1 : schema length mismatch
655        if self.fields().len() != other.fields().len() {
656            _plan_err!(
657                "Schema mismatch: the schema length are not same \
658            Expected schema length: {}, got: {}",
659                self.fields().len(),
660                other.fields().len()
661            )
662        } else {
663            // case 2 : schema length match, but fields mismatch
664            // check if the fields name are the same and have the same data types
665            self.fields()
666                .iter()
667                .zip(other.fields().iter())
668                .try_for_each(|(f1, f2)| {
669                    if f1.name() != f2.name()
670                        || (!DFSchema::datatype_is_semantically_equal(
671                            f1.data_type(),
672                            f2.data_type(),
673                        ))
674                    {
675                        _plan_err!(
676                            "Schema mismatch: Expected field '{}' with type {}, \
677                            but got '{}' with type {}.",
678                            f1.name(),
679                            f1.data_type(),
680                            f2.name(),
681                            f2.data_type()
682                        )
683                    } else {
684                        Ok(())
685                    }
686                })
687        }
688    }
689
690    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
691    /// than datatype_is_semantically_equal in that different representations of same data can be
692    /// logically but not semantically equivalent. Semantically equivalent types are always also
693    /// logically equivalent. For example:
694    /// - a Dictionary<K,V> type is logically equal to a plain V type
695    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
696    /// - Utf8 and Utf8View are logically equal
697    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
698        // check nested fields
699        match (dt1, dt2) {
700            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
701                Self::datatype_is_logically_equal(v1.as_ref(), v2.as_ref())
702            }
703            (DataType::Dictionary(_, v1), othertype)
704            | (othertype, DataType::Dictionary(_, v1)) => {
705                Self::datatype_is_logically_equal(v1.as_ref(), othertype)
706            }
707            (DataType::List(f1), DataType::List(f2))
708            | (DataType::LargeList(f1), DataType::LargeList(f2))
709            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
710                // Don't compare the names of the technical inner field
711                // Usually "item" but that's not mandated
712                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
713            }
714            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
715                // Don't compare the names of the technical inner fields
716                // Usually "entries", "key", "value" but that's not mandated
717                match (f1.data_type(), f2.data_type()) {
718                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
719                        f1_inner.len() == f2_inner.len()
720                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
721                                Self::datatype_is_logically_equal(
722                                    f1.data_type(),
723                                    f2.data_type(),
724                                )
725                            })
726                    }
727                    _ => panic!("Map type should have an inner struct field"),
728                }
729            }
730            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
731                let iter1 = fields1.iter();
732                let iter2 = fields2.iter();
733                fields1.len() == fields2.len() &&
734                        // all fields have to be the same
735                    iter1
736                    .zip(iter2)
737                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
738            }
739            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
740                let iter1 = fields1.iter();
741                let iter2 = fields2.iter();
742                fields1.len() == fields2.len() &&
743                    // all fields have to be the same
744                    iter1
745                        .zip(iter2)
746                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
747            }
748            // Utf8 and Utf8View are logically equivalent
749            (DataType::Utf8, DataType::Utf8View) => true,
750            (DataType::Utf8View, DataType::Utf8) => true,
751            _ => Self::datatype_is_semantically_equal(dt1, dt2),
752        }
753    }
754
755    /// Returns true of two [`DataType`]s are semantically equal (same
756    /// name and type), ignoring both metadata and nullability, decimal precision/scale,
757    /// and timezone time units/timezones.
758    ///
759    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
760    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
761        // check nested fields
762        match (dt1, dt2) {
763            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
764                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
765                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
766            }
767            (DataType::List(f1), DataType::List(f2))
768            | (DataType::LargeList(f1), DataType::LargeList(f2))
769            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
770                // Don't compare the names of the technical inner field
771                // Usually "item" but that's not mandated
772                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
773            }
774            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
775                // Don't compare the names of the technical inner fields
776                // Usually "entries", "key", "value" but that's not mandated
777                match (f1.data_type(), f2.data_type()) {
778                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
779                        f1_inner.len() == f2_inner.len()
780                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
781                                Self::datatype_is_semantically_equal(
782                                    f1.data_type(),
783                                    f2.data_type(),
784                                )
785                            })
786                    }
787                    _ => panic!("Map type should have an inner struct field"),
788                }
789            }
790            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
791                let iter1 = fields1.iter();
792                let iter2 = fields2.iter();
793                fields1.len() == fields2.len() &&
794                        // all fields have to be the same
795                    iter1
796                    .zip(iter2)
797                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
798            }
799            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
800                let iter1 = fields1.iter();
801                let iter2 = fields2.iter();
802                fields1.len() == fields2.len() &&
803                    // all fields have to be the same
804                    iter1
805                        .zip(iter2)
806                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
807            }
808            (
809                DataType::Decimal32(_l_precision, _l_scale),
810                DataType::Decimal32(_r_precision, _r_scale),
811            ) => true,
812            (
813                DataType::Decimal64(_l_precision, _l_scale),
814                DataType::Decimal64(_r_precision, _r_scale),
815            ) => true,
816            (
817                DataType::Decimal128(_l_precision, _l_scale),
818                DataType::Decimal128(_r_precision, _r_scale),
819            ) => true,
820            (
821                DataType::Decimal256(_l_precision, _l_scale),
822                DataType::Decimal256(_r_precision, _r_scale),
823            ) => true,
824            (
825                DataType::Timestamp(_l_time_unit, _l_timezone),
826                DataType::Timestamp(_r_time_unit, _r_timezone),
827            ) => true,
828            _ => dt1 == dt2,
829        }
830    }
831
832    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
833        f1.name() == f2.name()
834            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
835    }
836
837    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
838        f1.name() == f2.name()
839            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
840    }
841
842    /// Strip all field qualifier in schema
843    pub fn strip_qualifiers(self) -> Self {
844        DFSchema {
845            field_qualifiers: vec![None; self.inner.fields.len()],
846            inner: self.inner,
847            functional_dependencies: self.functional_dependencies,
848        }
849    }
850
851    /// Replace all field qualifier with new value in schema
852    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
853        let qualifier = qualifier.into();
854        DFSchema {
855            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
856            inner: self.inner,
857            functional_dependencies: self.functional_dependencies,
858        }
859    }
860
861    /// Get list of fully-qualified field names in this schema
862    pub fn field_names(&self) -> Vec<String> {
863        self.iter()
864            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
865            .collect::<Vec<_>>()
866    }
867
868    /// Get metadata of this schema
869    pub fn metadata(&self) -> &HashMap<String, String> {
870        &self.inner.metadata
871    }
872
873    /// Get functional dependencies
874    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
875        &self.functional_dependencies
876    }
877
878    /// Iterate over the qualifiers and fields in the DFSchema
879    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
880        self.field_qualifiers
881            .iter()
882            .zip(self.inner.fields().iter())
883            .map(|(qualifier, field)| (qualifier.as_ref(), field))
884    }
885    /// Returns a tree-like string representation of the schema.
886    ///
887    /// This method formats the schema
888    /// with a tree-like structure showing field names, types, and nullability.
889    ///
890    /// # Example
891    ///
892    /// ```
893    /// use arrow::datatypes::{DataType, Field, Schema};
894    /// use datafusion_common::DFSchema;
895    /// use std::collections::HashMap;
896    ///
897    /// let schema = DFSchema::from_unqualified_fields(
898    ///     vec![
899    ///         Field::new("id", DataType::Int32, false),
900    ///         Field::new("name", DataType::Utf8, true),
901    ///     ]
902    ///     .into(),
903    ///     HashMap::new(),
904    /// )
905    /// .unwrap();
906    ///
907    /// assert_eq!(
908    ///     schema.tree_string().to_string(),
909    ///     r#"root
910    ///  |-- id: int32 (nullable = false)
911    ///  |-- name: utf8 (nullable = true)"#
912    /// );
913    /// ```
914    pub fn tree_string(&self) -> impl Display + '_ {
915        let mut result = String::from("root\n");
916
917        for (qualifier, field) in self.iter() {
918            let field_name = match qualifier {
919                Some(q) => format!("{}.{}", q, field.name()),
920                None => field.name().to_string(),
921            };
922
923            format_field_with_indent(
924                &mut result,
925                &field_name,
926                field.data_type(),
927                field.is_nullable(),
928                " ",
929            );
930        }
931
932        // Remove the trailing newline
933        if result.ends_with('\n') {
934            result.pop();
935        }
936
937        result
938    }
939}
940
941/// Format field with proper nested indentation for complex types
942fn format_field_with_indent(
943    result: &mut String,
944    field_name: &str,
945    data_type: &DataType,
946    nullable: bool,
947    indent: &str,
948) {
949    let nullable_str = nullable.to_string().to_lowercase();
950    let child_indent = format!("{indent}|    ");
951
952    match data_type {
953        DataType::List(field) => {
954            result.push_str(&format!(
955                "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
956            ));
957            format_field_with_indent(
958                result,
959                field.name(),
960                field.data_type(),
961                field.is_nullable(),
962                &child_indent,
963            );
964        }
965        DataType::LargeList(field) => {
966            result.push_str(&format!(
967                "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
968            ));
969            format_field_with_indent(
970                result,
971                field.name(),
972                field.data_type(),
973                field.is_nullable(),
974                &child_indent,
975            );
976        }
977        DataType::FixedSizeList(field, _size) => {
978            result.push_str(&format!(
979                "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
980            ));
981            format_field_with_indent(
982                result,
983                field.name(),
984                field.data_type(),
985                field.is_nullable(),
986                &child_indent,
987            );
988        }
989        DataType::Map(field, _) => {
990            result.push_str(&format!(
991                "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
992            ));
993            if let DataType::Struct(inner_fields) = field.data_type()
994                && inner_fields.len() == 2
995            {
996                format_field_with_indent(
997                    result,
998                    "key",
999                    inner_fields[0].data_type(),
1000                    inner_fields[0].is_nullable(),
1001                    &child_indent,
1002                );
1003                let value_contains_null = field.is_nullable().to_string().to_lowercase();
1004                // Handle complex value types properly
1005                match inner_fields[1].data_type() {
1006                    DataType::Struct(_)
1007                    | DataType::List(_)
1008                    | DataType::LargeList(_)
1009                    | DataType::FixedSizeList(_, _)
1010                    | DataType::Map(_, _) => {
1011                        format_field_with_indent(
1012                            result,
1013                            "value",
1014                            inner_fields[1].data_type(),
1015                            inner_fields[1].is_nullable(),
1016                            &child_indent,
1017                        );
1018                    }
1019                    _ => {
1020                        result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1021                                format_simple_data_type(inner_fields[1].data_type())));
1022                    }
1023                }
1024            }
1025        }
1026        DataType::Struct(fields) => {
1027            result.push_str(&format!(
1028                "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1029            ));
1030            for struct_field in fields {
1031                format_field_with_indent(
1032                    result,
1033                    struct_field.name(),
1034                    struct_field.data_type(),
1035                    struct_field.is_nullable(),
1036                    &child_indent,
1037                );
1038            }
1039        }
1040        _ => {
1041            let type_str = format_simple_data_type(data_type);
1042            result.push_str(&format!(
1043                "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1044            ));
1045        }
1046    }
1047}
1048
1049/// Format simple DataType in lowercase format (for leaf nodes)
1050fn format_simple_data_type(data_type: &DataType) -> String {
1051    match data_type {
1052        DataType::Boolean => "boolean".to_string(),
1053        DataType::Int8 => "int8".to_string(),
1054        DataType::Int16 => "int16".to_string(),
1055        DataType::Int32 => "int32".to_string(),
1056        DataType::Int64 => "int64".to_string(),
1057        DataType::UInt8 => "uint8".to_string(),
1058        DataType::UInt16 => "uint16".to_string(),
1059        DataType::UInt32 => "uint32".to_string(),
1060        DataType::UInt64 => "uint64".to_string(),
1061        DataType::Float16 => "float16".to_string(),
1062        DataType::Float32 => "float32".to_string(),
1063        DataType::Float64 => "float64".to_string(),
1064        DataType::Utf8 => "utf8".to_string(),
1065        DataType::LargeUtf8 => "large_utf8".to_string(),
1066        DataType::Binary => "binary".to_string(),
1067        DataType::LargeBinary => "large_binary".to_string(),
1068        DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1069        DataType::Date32 => "date32".to_string(),
1070        DataType::Date64 => "date64".to_string(),
1071        DataType::Time32(_) => "time32".to_string(),
1072        DataType::Time64(_) => "time64".to_string(),
1073        DataType::Timestamp(_, tz) => match tz {
1074            Some(tz_str) => format!("timestamp ({tz_str})"),
1075            None => "timestamp".to_string(),
1076        },
1077        DataType::Interval(_) => "interval".to_string(),
1078        DataType::Dictionary(_, value_type) => {
1079            format_simple_data_type(value_type.as_ref())
1080        }
1081        DataType::Decimal32(precision, scale) => {
1082            format!("decimal32({precision}, {scale})")
1083        }
1084        DataType::Decimal64(precision, scale) => {
1085            format!("decimal64({precision}, {scale})")
1086        }
1087        DataType::Decimal128(precision, scale) => {
1088            format!("decimal128({precision}, {scale})")
1089        }
1090        DataType::Decimal256(precision, scale) => {
1091            format!("decimal256({precision}, {scale})")
1092        }
1093        DataType::Null => "null".to_string(),
1094        _ => format!("{data_type}").to_lowercase(),
1095    }
1096}
1097
1098/// Allow DFSchema to be converted into an Arrow `&Schema`
1099impl AsRef<Schema> for DFSchema {
1100    fn as_ref(&self) -> &Schema {
1101        self.as_arrow()
1102    }
1103}
1104
1105/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
1106/// example)
1107impl AsRef<SchemaRef> for DFSchema {
1108    fn as_ref(&self) -> &SchemaRef {
1109        self.inner()
1110    }
1111}
1112
1113/// Create a `DFSchema` from an Arrow schema
1114impl TryFrom<Schema> for DFSchema {
1115    type Error = DataFusionError;
1116    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1117        Self::try_from(Arc::new(schema))
1118    }
1119}
1120
1121impl TryFrom<SchemaRef> for DFSchema {
1122    type Error = DataFusionError;
1123    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1124        let field_count = schema.fields.len();
1125        let dfschema = Self {
1126            inner: schema,
1127            field_qualifiers: vec![None; field_count],
1128            functional_dependencies: FunctionalDependencies::empty(),
1129        };
1130        // Without checking names, because schema here may have duplicate field names.
1131        // For example, Partial AggregateMode will generate duplicate field names from
1132        // state_fields.
1133        // See <https://github.com/apache/datafusion/issues/17715>
1134        // dfschema.check_names()?;
1135        Ok(dfschema)
1136    }
1137}
1138
1139impl From<DFSchema> for SchemaRef {
1140    fn from(dfschema: DFSchema) -> Self {
1141        Arc::clone(&dfschema.inner)
1142    }
1143}
1144
1145// Hashing refers to a subset of fields considered in PartialEq.
1146impl Hash for DFSchema {
1147    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1148        self.inner.fields.hash(state);
1149        self.inner.metadata.len().hash(state); // HashMap is not hashable
1150    }
1151}
1152
1153/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
1154pub trait ToDFSchema
1155where
1156    Self: Sized,
1157{
1158    /// Attempt to create a DSSchema
1159    fn to_dfschema(self) -> Result<DFSchema>;
1160
1161    /// Attempt to create a DSSchemaRef
1162    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1163        Ok(Arc::new(self.to_dfschema()?))
1164    }
1165}
1166
1167impl ToDFSchema for Schema {
1168    fn to_dfschema(self) -> Result<DFSchema> {
1169        DFSchema::try_from(self)
1170    }
1171}
1172
1173impl ToDFSchema for SchemaRef {
1174    fn to_dfschema(self) -> Result<DFSchema> {
1175        DFSchema::try_from(self)
1176    }
1177}
1178
1179impl ToDFSchema for Vec<Field> {
1180    fn to_dfschema(self) -> Result<DFSchema> {
1181        let field_count = self.len();
1182        let schema = Schema {
1183            fields: self.into(),
1184            metadata: HashMap::new(),
1185        };
1186        let dfschema = DFSchema {
1187            inner: schema.into(),
1188            field_qualifiers: vec![None; field_count],
1189            functional_dependencies: FunctionalDependencies::empty(),
1190        };
1191        Ok(dfschema)
1192    }
1193}
1194
1195impl Display for DFSchema {
1196    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1197        write!(
1198            f,
1199            "fields:[{}], metadata:{:?}",
1200            self.iter()
1201                .map(|(q, f)| qualified_name(q, f.name()))
1202                .collect::<Vec<String>>()
1203                .join(", "),
1204            self.inner.metadata
1205        )
1206    }
1207}
1208
1209/// Provides schema information needed by certain methods of `Expr`
1210/// (defined in the datafusion-common crate).
1211///
1212/// Note that this trait is implemented for &[DFSchema] which is
1213/// widely used in the DataFusion codebase.
1214pub trait ExprSchema: std::fmt::Debug {
1215    /// Is this column reference nullable?
1216    fn nullable(&self, col: &Column) -> Result<bool> {
1217        Ok(self.field_from_column(col)?.is_nullable())
1218    }
1219
1220    /// What is the datatype of this column?
1221    fn data_type(&self, col: &Column) -> Result<&DataType> {
1222        Ok(self.field_from_column(col)?.data_type())
1223    }
1224
1225    /// Returns the column's optional metadata.
1226    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1227        Ok(self.field_from_column(col)?.metadata())
1228    }
1229
1230    /// Return the column's datatype and nullability
1231    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1232        let field = self.field_from_column(col)?;
1233        Ok((field.data_type(), field.is_nullable()))
1234    }
1235
1236    // Return the column's field
1237    fn field_from_column(&self, col: &Column) -> Result<&FieldRef>;
1238}
1239
1240// Implement `ExprSchema` for `Arc<DFSchema>`
1241impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1242    fn nullable(&self, col: &Column) -> Result<bool> {
1243        self.as_ref().nullable(col)
1244    }
1245
1246    fn data_type(&self, col: &Column) -> Result<&DataType> {
1247        self.as_ref().data_type(col)
1248    }
1249
1250    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1251        ExprSchema::metadata(self.as_ref(), col)
1252    }
1253
1254    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1255        self.as_ref().data_type_and_nullable(col)
1256    }
1257
1258    fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1259        self.as_ref().field_from_column(col)
1260    }
1261}
1262
1263impl ExprSchema for DFSchema {
1264    fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1265        match &col.relation {
1266            Some(r) => self.field_with_qualified_name(r, &col.name),
1267            None => self.field_with_unqualified_name(&col.name),
1268        }
1269    }
1270}
1271
1272/// DataFusion-specific extensions to [`Schema`].
1273pub trait SchemaExt {
1274    /// This is a specialized version of Eq that ignores differences
1275    /// in nullability and metadata.
1276    ///
1277    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1278    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1279
1280    /// Returns nothing if the two schemas have the same qualified named
1281    /// fields with logically equivalent data types. Returns internal error otherwise.
1282    ///
1283    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1284    /// equivalence checking.
1285    ///
1286    /// It is only used by insert into cases.
1287    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1288}
1289
1290impl SchemaExt for Schema {
1291    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1292        if self.fields().len() != other.fields().len() {
1293            return false;
1294        }
1295
1296        self.fields()
1297            .iter()
1298            .zip(other.fields().iter())
1299            .all(|(f1, f2)| {
1300                f1.name() == f2.name()
1301                    && DFSchema::datatype_is_semantically_equal(
1302                        f1.data_type(),
1303                        f2.data_type(),
1304                    )
1305            })
1306    }
1307
1308    // It is only used by insert into cases.
1309    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1310        // case 1 : schema length mismatch
1311        if self.fields().len() != other.fields().len() {
1312            _plan_err!(
1313                "Inserting query must have the same schema length as the table. \
1314            Expected table schema length: {}, got: {}",
1315                self.fields().len(),
1316                other.fields().len()
1317            )
1318        } else {
1319            // case 2 : schema length match, but fields mismatch
1320            // check if the fields name are the same and have the same data types
1321            self.fields()
1322                .iter()
1323                .zip(other.fields().iter())
1324                .try_for_each(|(f1, f2)| {
1325                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1326                        _plan_err!(
1327                            "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1328                            but got '{}' with type {}.",
1329                            f1.name(),
1330                            f1.data_type(),
1331                            f2.name(),
1332                            f2.data_type())
1333                    } else {
1334                        Ok(())
1335                    }
1336                })
1337        }
1338    }
1339}
1340
1341pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1342    match qualifier {
1343        Some(q) => format!("{q}.{name}"),
1344        None => name.to_string(),
1345    }
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350    use crate::assert_contains;
1351
1352    use super::*;
1353
1354    #[test]
1355    fn qualifier_in_name() -> Result<()> {
1356        let col = Column::from_name("t1.c0");
1357        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1358        // lookup with unqualified name "t1.c0"
1359        let err = schema.index_of_column(&col).unwrap_err();
1360        let expected = "Schema error: No field named \"t1.c0\". \
1361            Column names are case sensitive. \
1362            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1363            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1364            Did you mean 't1.c0'?.";
1365        assert_eq!(err.strip_backtrace(), expected);
1366        Ok(())
1367    }
1368
1369    #[test]
1370    fn quoted_qualifiers_in_name() -> Result<()> {
1371        let col = Column::from_name("t1.c0");
1372        let schema = DFSchema::try_from_qualified_schema(
1373            "t1",
1374            &Schema::new(vec![
1375                Field::new("CapitalColumn", DataType::Boolean, true),
1376                Field::new("field.with.period", DataType::Boolean, true),
1377            ]),
1378        )?;
1379
1380        // lookup with unqualified name "t1.c0"
1381        let err = schema.index_of_column(&col).unwrap_err();
1382        let expected = "Schema error: No field named \"t1.c0\". \
1383            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1384        assert_eq!(err.strip_backtrace(), expected);
1385        Ok(())
1386    }
1387
1388    #[test]
1389    fn from_unqualified_schema() -> Result<()> {
1390        let schema = DFSchema::try_from(test_schema_1())?;
1391        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1392        Ok(())
1393    }
1394
1395    #[test]
1396    fn from_qualified_schema() -> Result<()> {
1397        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1398        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1399        Ok(())
1400    }
1401
1402    #[test]
1403    fn test_from_field_specific_qualified_schema() -> Result<()> {
1404        let schema = DFSchema::from_field_specific_qualified_schema(
1405            vec![Some("t1".into()), None],
1406            &Arc::new(Schema::new(vec![
1407                Field::new("c0", DataType::Boolean, true),
1408                Field::new("c1", DataType::Boolean, true),
1409            ])),
1410        )?;
1411        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1412        Ok(())
1413    }
1414
1415    #[test]
1416    fn test_from_qualified_fields() -> Result<()> {
1417        let schema = DFSchema::new_with_metadata(
1418            vec![
1419                (
1420                    Some("t0".into()),
1421                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1422                ),
1423                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1424            ],
1425            HashMap::new(),
1426        )?;
1427        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1428        Ok(())
1429    }
1430
1431    #[test]
1432    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1433        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1434        let arrow_schema = schema.as_arrow();
1435        insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1436        Ok(())
1437    }
1438
1439    #[test]
1440    fn join_qualified() -> Result<()> {
1441        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1442        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1443        let join = left.join(&right)?;
1444        assert_eq!(
1445            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1446            join.to_string()
1447        );
1448        // test valid access
1449        assert!(
1450            join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1451                .is_ok()
1452        );
1453        assert!(
1454            join.field_with_qualified_name(&TableReference::bare("t2"), "c0")
1455                .is_ok()
1456        );
1457        // test invalid access
1458        assert!(join.field_with_unqualified_name("c0").is_err());
1459        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1460        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1461        Ok(())
1462    }
1463
1464    #[test]
1465    fn join_qualified_duplicate() -> Result<()> {
1466        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1467        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1468        let join = left.join(&right);
1469        assert_eq!(
1470            join.unwrap_err().strip_backtrace(),
1471            "Schema error: Schema contains duplicate qualified field name t1.c0",
1472        );
1473        Ok(())
1474    }
1475
1476    #[test]
1477    fn join_unqualified_duplicate() -> Result<()> {
1478        let left = DFSchema::try_from(test_schema_1())?;
1479        let right = DFSchema::try_from(test_schema_1())?;
1480        let join = left.join(&right);
1481        assert_eq!(
1482            join.unwrap_err().strip_backtrace(),
1483            "Schema error: Schema contains duplicate unqualified field name c0"
1484        );
1485        Ok(())
1486    }
1487
1488    #[test]
1489    fn join_mixed() -> Result<()> {
1490        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1491        let right = DFSchema::try_from(test_schema_2())?;
1492        let join = left.join(&right)?;
1493        assert_eq!(
1494            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1495            join.to_string()
1496        );
1497        // test valid access
1498        assert!(
1499            join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1500                .is_ok()
1501        );
1502        assert!(join.field_with_unqualified_name("c0").is_ok());
1503        assert!(join.field_with_unqualified_name("c100").is_ok());
1504        assert!(join.field_with_name(None, "c100").is_ok());
1505        // test invalid access
1506        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1507        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1508        assert!(
1509            join.field_with_qualified_name(&TableReference::bare(""), "c100")
1510                .is_err()
1511        );
1512        Ok(())
1513    }
1514
1515    #[test]
1516    fn join_mixed_duplicate() -> Result<()> {
1517        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1518        let right = DFSchema::try_from(test_schema_1())?;
1519        let join = left.join(&right);
1520        assert_contains!(
1521            join.unwrap_err().to_string(),
1522            "Schema error: Schema contains qualified \
1523                          field name t1.c0 and unqualified field name c0 which would be ambiguous"
1524        );
1525        Ok(())
1526    }
1527
1528    #[test]
1529    fn helpful_error_messages() -> Result<()> {
1530        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1531        let expected_help = "Valid fields are t1.c0, t1.c1.";
1532        assert_contains!(
1533            schema
1534                .field_with_qualified_name(&TableReference::bare("x"), "y")
1535                .unwrap_err()
1536                .to_string(),
1537            expected_help
1538        );
1539        assert_contains!(
1540            schema
1541                .field_with_unqualified_name("y")
1542                .unwrap_err()
1543                .to_string(),
1544            expected_help
1545        );
1546        assert!(schema.index_of_column_by_name(None, "y").is_none());
1547        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1548
1549        Ok(())
1550    }
1551
1552    #[test]
1553    fn select_without_valid_fields() {
1554        let schema = DFSchema::empty();
1555
1556        let col = Column::from_qualified_name("t1.c0");
1557        let err = schema.index_of_column(&col).unwrap_err();
1558        let expected = "Schema error: No field named t1.c0.";
1559        assert_eq!(err.strip_backtrace(), expected);
1560
1561        // the same check without qualifier
1562        let col = Column::from_name("c0");
1563        let err = schema.index_of_column(&col).err().unwrap();
1564        let expected = "Schema error: No field named c0.";
1565        assert_eq!(err.strip_backtrace(), expected);
1566    }
1567
1568    #[test]
1569    fn into() {
1570        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1571        let arrow_schema = Schema::new_with_metadata(
1572            vec![Field::new("c0", DataType::Int64, true)],
1573            test_metadata(),
1574        );
1575        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1576
1577        let df_schema = DFSchema {
1578            inner: Arc::clone(&arrow_schema_ref),
1579            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1580            functional_dependencies: FunctionalDependencies::empty(),
1581        };
1582        let df_schema_ref = Arc::new(df_schema.clone());
1583
1584        {
1585            let arrow_schema = arrow_schema.clone();
1586            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1587
1588            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1589            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1590        }
1591
1592        {
1593            let arrow_schema = arrow_schema.clone();
1594            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1595
1596            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1597            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1598        }
1599
1600        // Now, consume the refs
1601        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1602        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1603    }
1604
1605    fn test_schema_1() -> Schema {
1606        Schema::new(vec![
1607            Field::new("c0", DataType::Boolean, true),
1608            Field::new("c1", DataType::Boolean, true),
1609        ])
1610    }
1611    #[test]
1612    fn test_dfschema_to_schema_conversion() {
1613        let mut a_metadata = HashMap::new();
1614        a_metadata.insert("key".to_string(), "value".to_string());
1615        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1616
1617        let mut b_metadata = HashMap::new();
1618        b_metadata.insert("key".to_string(), "value".to_string());
1619        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1620
1621        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1622
1623        let df_schema = DFSchema {
1624            inner: Arc::clone(&schema),
1625            field_qualifiers: vec![None; schema.fields.len()],
1626            functional_dependencies: FunctionalDependencies::empty(),
1627        };
1628
1629        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1630    }
1631
1632    #[test]
1633    fn test_contain_column() -> Result<()> {
1634        // qualified exists
1635        {
1636            let col = Column::from_qualified_name("t1.c0");
1637            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1638            assert!(schema.is_column_from_schema(&col));
1639        }
1640
1641        // qualified not exists
1642        {
1643            let col = Column::from_qualified_name("t1.c2");
1644            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1645            assert!(!schema.is_column_from_schema(&col));
1646        }
1647
1648        // unqualified exists
1649        {
1650            let col = Column::from_name("c0");
1651            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1652            assert!(schema.is_column_from_schema(&col));
1653        }
1654
1655        // unqualified not exists
1656        {
1657            let col = Column::from_name("c2");
1658            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1659            assert!(!schema.is_column_from_schema(&col));
1660        }
1661
1662        Ok(())
1663    }
1664
1665    #[test]
1666    fn test_datatype_is_logically_equal() {
1667        assert!(DFSchema::datatype_is_logically_equal(
1668            &DataType::Int8,
1669            &DataType::Int8
1670        ));
1671
1672        assert!(!DFSchema::datatype_is_logically_equal(
1673            &DataType::Int8,
1674            &DataType::Int16
1675        ));
1676
1677        // Test lists
1678
1679        // Succeeds if both have the same element type, disregards names and nullability
1680        assert!(DFSchema::datatype_is_logically_equal(
1681            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1682            &DataType::List(Field::new("element", DataType::Int8, false).into())
1683        ));
1684
1685        // Fails if element type is different
1686        assert!(!DFSchema::datatype_is_logically_equal(
1687            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1688            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1689        ));
1690
1691        // Test maps
1692        let map_field = DataType::Map(
1693            Field::new(
1694                "entries",
1695                DataType::Struct(Fields::from(vec![
1696                    Field::new("key", DataType::Int8, false),
1697                    Field::new("value", DataType::Int8, true),
1698                ])),
1699                true,
1700            )
1701            .into(),
1702            true,
1703        );
1704
1705        // Succeeds if both maps have the same key and value types, disregards names and nullability
1706        assert!(DFSchema::datatype_is_logically_equal(
1707            &map_field,
1708            &DataType::Map(
1709                Field::new(
1710                    "pairs",
1711                    DataType::Struct(Fields::from(vec![
1712                        Field::new("one", DataType::Int8, false),
1713                        Field::new("two", DataType::Int8, false)
1714                    ])),
1715                    true
1716                )
1717                .into(),
1718                true
1719            )
1720        ));
1721        // Fails if value type is different
1722        assert!(!DFSchema::datatype_is_logically_equal(
1723            &map_field,
1724            &DataType::Map(
1725                Field::new(
1726                    "entries",
1727                    DataType::Struct(Fields::from(vec![
1728                        Field::new("key", DataType::Int8, false),
1729                        Field::new("value", DataType::Int16, true)
1730                    ])),
1731                    true
1732                )
1733                .into(),
1734                true
1735            )
1736        ));
1737
1738        // Fails if key type is different
1739        assert!(!DFSchema::datatype_is_logically_equal(
1740            &map_field,
1741            &DataType::Map(
1742                Field::new(
1743                    "entries",
1744                    DataType::Struct(Fields::from(vec![
1745                        Field::new("key", DataType::Int16, false),
1746                        Field::new("value", DataType::Int8, true)
1747                    ])),
1748                    true
1749                )
1750                .into(),
1751                true
1752            )
1753        ));
1754
1755        // Test structs
1756
1757        let struct_field = DataType::Struct(Fields::from(vec![
1758            Field::new("a", DataType::Int8, true),
1759            Field::new("b", DataType::Int8, true),
1760        ]));
1761
1762        // Succeeds if both have same names and datatypes, ignores nullability
1763        assert!(DFSchema::datatype_is_logically_equal(
1764            &struct_field,
1765            &DataType::Struct(Fields::from(vec![
1766                Field::new("a", DataType::Int8, false),
1767                Field::new("b", DataType::Int8, true),
1768            ]))
1769        ));
1770
1771        // Fails if field names are different
1772        assert!(!DFSchema::datatype_is_logically_equal(
1773            &struct_field,
1774            &DataType::Struct(Fields::from(vec![
1775                Field::new("x", DataType::Int8, true),
1776                Field::new("y", DataType::Int8, true),
1777            ]))
1778        ));
1779
1780        // Fails if types are different
1781        assert!(!DFSchema::datatype_is_logically_equal(
1782            &struct_field,
1783            &DataType::Struct(Fields::from(vec![
1784                Field::new("a", DataType::Int16, true),
1785                Field::new("b", DataType::Int8, true),
1786            ]))
1787        ));
1788
1789        // Fails if more or less fields
1790        assert!(!DFSchema::datatype_is_logically_equal(
1791            &struct_field,
1792            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1793        ));
1794    }
1795
1796    #[test]
1797    fn test_datatype_is_logically_equivalent_to_dictionary() {
1798        // Dictionary is logically equal to its value type
1799        assert!(DFSchema::datatype_is_logically_equal(
1800            &DataType::Utf8,
1801            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1802        ));
1803
1804        // Dictionary is logically equal to the logically equivalent value type
1805        assert!(DFSchema::datatype_is_logically_equal(
1806            &DataType::Utf8View,
1807            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1808        ));
1809
1810        assert!(DFSchema::datatype_is_logically_equal(
1811            &DataType::Dictionary(
1812                Box::new(DataType::Int32),
1813                Box::new(DataType::List(
1814                    Field::new("element", DataType::Utf8, false).into()
1815                ))
1816            ),
1817            &DataType::Dictionary(
1818                Box::new(DataType::Int32),
1819                Box::new(DataType::List(
1820                    Field::new("element", DataType::Utf8View, false).into()
1821                ))
1822            )
1823        ));
1824    }
1825
1826    #[test]
1827    fn test_datatype_is_semantically_equal() {
1828        assert!(DFSchema::datatype_is_semantically_equal(
1829            &DataType::Int8,
1830            &DataType::Int8
1831        ));
1832
1833        assert!(!DFSchema::datatype_is_semantically_equal(
1834            &DataType::Int8,
1835            &DataType::Int16
1836        ));
1837
1838        // Succeeds if decimal precision and scale are different
1839        assert!(DFSchema::datatype_is_semantically_equal(
1840            &DataType::Decimal32(1, 2),
1841            &DataType::Decimal32(2, 1),
1842        ));
1843
1844        assert!(DFSchema::datatype_is_semantically_equal(
1845            &DataType::Decimal64(1, 2),
1846            &DataType::Decimal64(2, 1),
1847        ));
1848
1849        assert!(DFSchema::datatype_is_semantically_equal(
1850            &DataType::Decimal128(1, 2),
1851            &DataType::Decimal128(2, 1),
1852        ));
1853
1854        assert!(DFSchema::datatype_is_semantically_equal(
1855            &DataType::Decimal256(1, 2),
1856            &DataType::Decimal256(2, 1),
1857        ));
1858
1859        // Any two timestamp types should match
1860        assert!(DFSchema::datatype_is_semantically_equal(
1861            &DataType::Timestamp(
1862                arrow::datatypes::TimeUnit::Microsecond,
1863                Some("UTC".into())
1864            ),
1865            &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1866        ));
1867
1868        // Test lists
1869
1870        // Succeeds if both have the same element type, disregards names and nullability
1871        assert!(DFSchema::datatype_is_semantically_equal(
1872            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1873            &DataType::List(Field::new("element", DataType::Int8, false).into())
1874        ));
1875
1876        // Fails if element type is different
1877        assert!(!DFSchema::datatype_is_semantically_equal(
1878            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1879            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1880        ));
1881
1882        // Test maps
1883        let map_field = DataType::Map(
1884            Field::new(
1885                "entries",
1886                DataType::Struct(Fields::from(vec![
1887                    Field::new("key", DataType::Int8, false),
1888                    Field::new("value", DataType::Int8, true),
1889                ])),
1890                true,
1891            )
1892            .into(),
1893            true,
1894        );
1895
1896        // Succeeds if both maps have the same key and value types, disregards names and nullability
1897        assert!(DFSchema::datatype_is_semantically_equal(
1898            &map_field,
1899            &DataType::Map(
1900                Field::new(
1901                    "pairs",
1902                    DataType::Struct(Fields::from(vec![
1903                        Field::new("one", DataType::Int8, false),
1904                        Field::new("two", DataType::Int8, false)
1905                    ])),
1906                    true
1907                )
1908                .into(),
1909                true
1910            )
1911        ));
1912        // Fails if value type is different
1913        assert!(!DFSchema::datatype_is_semantically_equal(
1914            &map_field,
1915            &DataType::Map(
1916                Field::new(
1917                    "entries",
1918                    DataType::Struct(Fields::from(vec![
1919                        Field::new("key", DataType::Int8, false),
1920                        Field::new("value", DataType::Int16, true)
1921                    ])),
1922                    true
1923                )
1924                .into(),
1925                true
1926            )
1927        ));
1928
1929        // Fails if key type is different
1930        assert!(!DFSchema::datatype_is_semantically_equal(
1931            &map_field,
1932            &DataType::Map(
1933                Field::new(
1934                    "entries",
1935                    DataType::Struct(Fields::from(vec![
1936                        Field::new("key", DataType::Int16, false),
1937                        Field::new("value", DataType::Int8, true)
1938                    ])),
1939                    true
1940                )
1941                .into(),
1942                true
1943            )
1944        ));
1945
1946        // Test structs
1947
1948        let struct_field = DataType::Struct(Fields::from(vec![
1949            Field::new("a", DataType::Int8, true),
1950            Field::new("b", DataType::Int8, true),
1951        ]));
1952
1953        // Succeeds if both have same names and datatypes, ignores nullability
1954        assert!(DFSchema::datatype_is_logically_equal(
1955            &struct_field,
1956            &DataType::Struct(Fields::from(vec![
1957                Field::new("a", DataType::Int8, false),
1958                Field::new("b", DataType::Int8, true),
1959            ]))
1960        ));
1961
1962        // Fails if field names are different
1963        assert!(!DFSchema::datatype_is_logically_equal(
1964            &struct_field,
1965            &DataType::Struct(Fields::from(vec![
1966                Field::new("x", DataType::Int8, true),
1967                Field::new("y", DataType::Int8, true),
1968            ]))
1969        ));
1970
1971        // Fails if types are different
1972        assert!(!DFSchema::datatype_is_logically_equal(
1973            &struct_field,
1974            &DataType::Struct(Fields::from(vec![
1975                Field::new("a", DataType::Int16, true),
1976                Field::new("b", DataType::Int8, true),
1977            ]))
1978        ));
1979
1980        // Fails if more or less fields
1981        assert!(!DFSchema::datatype_is_logically_equal(
1982            &struct_field,
1983            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1984        ));
1985    }
1986
1987    #[test]
1988    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1989        // Dictionary is not semantically equal to its value type
1990        assert!(!DFSchema::datatype_is_semantically_equal(
1991            &DataType::Utf8,
1992            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1993        ));
1994    }
1995
1996    fn test_schema_2() -> Schema {
1997        Schema::new(vec![
1998            Field::new("c100", DataType::Boolean, true),
1999            Field::new("c101", DataType::Boolean, true),
2000        ])
2001    }
2002
2003    fn test_metadata() -> HashMap<String, String> {
2004        test_metadata_n(2)
2005    }
2006
2007    fn test_metadata_n(n: usize) -> HashMap<String, String> {
2008        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
2009    }
2010
2011    #[test]
2012    fn test_print_schema_unqualified() {
2013        let schema = DFSchema::from_unqualified_fields(
2014            vec![
2015                Field::new("id", DataType::Int32, false),
2016                Field::new("name", DataType::Utf8, true),
2017                Field::new("age", DataType::Int64, true),
2018                Field::new("active", DataType::Boolean, false),
2019            ]
2020            .into(),
2021            HashMap::new(),
2022        )
2023        .unwrap();
2024
2025        let output = schema.tree_string();
2026
2027        insta::assert_snapshot!(output, @r"
2028        root
2029         |-- id: int32 (nullable = false)
2030         |-- name: utf8 (nullable = true)
2031         |-- age: int64 (nullable = true)
2032         |-- active: boolean (nullable = false)
2033        ");
2034    }
2035
2036    #[test]
2037    fn test_print_schema_qualified() {
2038        let schema = DFSchema::try_from_qualified_schema(
2039            "table1",
2040            &Schema::new(vec![
2041                Field::new("id", DataType::Int32, false),
2042                Field::new("name", DataType::Utf8, true),
2043            ]),
2044        )
2045        .unwrap();
2046
2047        let output = schema.tree_string();
2048
2049        insta::assert_snapshot!(output, @r"
2050        root
2051         |-- table1.id: int32 (nullable = false)
2052         |-- table1.name: utf8 (nullable = true)
2053        ");
2054    }
2055
2056    #[test]
2057    fn test_print_schema_complex_types() {
2058        let struct_field = Field::new(
2059            "address",
2060            DataType::Struct(Fields::from(vec![
2061                Field::new("street", DataType::Utf8, true),
2062                Field::new("city", DataType::Utf8, true),
2063            ])),
2064            true,
2065        );
2066
2067        let list_field = Field::new(
2068            "tags",
2069            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2070            true,
2071        );
2072
2073        let schema = DFSchema::from_unqualified_fields(
2074            vec![
2075                Field::new("id", DataType::Int32, false),
2076                struct_field,
2077                list_field,
2078                Field::new("score", DataType::Decimal128(10, 2), true),
2079            ]
2080            .into(),
2081            HashMap::new(),
2082        )
2083        .unwrap();
2084
2085        let output = schema.tree_string();
2086        insta::assert_snapshot!(output, @r"
2087        root
2088         |-- id: int32 (nullable = false)
2089         |-- address: struct (nullable = true)
2090         |    |-- street: utf8 (nullable = true)
2091         |    |-- city: utf8 (nullable = true)
2092         |-- tags: list (nullable = true)
2093         |    |-- item: utf8 (nullable = true)
2094         |-- score: decimal128(10, 2) (nullable = true)
2095        ");
2096    }
2097
2098    #[test]
2099    fn test_print_schema_empty() {
2100        let schema = DFSchema::empty();
2101        let output = schema.tree_string();
2102        insta::assert_snapshot!(output, @"root");
2103    }
2104
2105    #[test]
2106    fn test_print_schema_deeply_nested_types() {
2107        // Create a deeply nested structure to test indentation and complex type formatting
2108        let inner_struct = Field::new(
2109            "inner",
2110            DataType::Struct(Fields::from(vec![
2111                Field::new("level1", DataType::Utf8, true),
2112                Field::new("level2", DataType::Int32, false),
2113            ])),
2114            true,
2115        );
2116
2117        let nested_list = Field::new(
2118            "nested_list",
2119            DataType::List(Arc::new(Field::new(
2120                "item",
2121                DataType::Struct(Fields::from(vec![
2122                    Field::new("id", DataType::Int64, false),
2123                    Field::new("value", DataType::Float64, true),
2124                ])),
2125                true,
2126            ))),
2127            true,
2128        );
2129
2130        let map_field = Field::new(
2131            "map_data",
2132            DataType::Map(
2133                Arc::new(Field::new(
2134                    "entries",
2135                    DataType::Struct(Fields::from(vec![
2136                        Field::new("key", DataType::Utf8, false),
2137                        Field::new(
2138                            "value",
2139                            DataType::List(Arc::new(Field::new(
2140                                "item",
2141                                DataType::Int32,
2142                                true,
2143                            ))),
2144                            true,
2145                        ),
2146                    ])),
2147                    false,
2148                )),
2149                false,
2150            ),
2151            true,
2152        );
2153
2154        let schema = DFSchema::from_unqualified_fields(
2155            vec![
2156                Field::new("simple_field", DataType::Utf8, true),
2157                inner_struct,
2158                nested_list,
2159                map_field,
2160                Field::new(
2161                    "timestamp_field",
2162                    DataType::Timestamp(
2163                        arrow::datatypes::TimeUnit::Microsecond,
2164                        Some("UTC".into()),
2165                    ),
2166                    false,
2167                ),
2168            ]
2169            .into(),
2170            HashMap::new(),
2171        )
2172        .unwrap();
2173
2174        let output = schema.tree_string();
2175
2176        insta::assert_snapshot!(output, @r"
2177        root
2178         |-- simple_field: utf8 (nullable = true)
2179         |-- inner: struct (nullable = true)
2180         |    |-- level1: utf8 (nullable = true)
2181         |    |-- level2: int32 (nullable = false)
2182         |-- nested_list: list (nullable = true)
2183         |    |-- item: struct (nullable = true)
2184         |    |    |-- id: int64 (nullable = false)
2185         |    |    |-- value: float64 (nullable = true)
2186         |-- map_data: map (nullable = true)
2187         |    |-- key: utf8 (nullable = false)
2188         |    |-- value: list (nullable = true)
2189         |    |    |-- item: int32 (nullable = true)
2190         |-- timestamp_field: timestamp (UTC) (nullable = false)
2191        ");
2192    }
2193
2194    #[test]
2195    fn test_print_schema_mixed_qualified_unqualified() {
2196        // Test a schema with mixed qualified and unqualified fields
2197        let schema = DFSchema::new_with_metadata(
2198            vec![
2199                (
2200                    Some("table1".into()),
2201                    Arc::new(Field::new("id", DataType::Int32, false)),
2202                ),
2203                (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2204                (
2205                    Some("table2".into()),
2206                    Arc::new(Field::new("score", DataType::Float64, true)),
2207                ),
2208                (
2209                    None,
2210                    Arc::new(Field::new("active", DataType::Boolean, false)),
2211                ),
2212            ],
2213            HashMap::new(),
2214        )
2215        .unwrap();
2216
2217        let output = schema.tree_string();
2218
2219        insta::assert_snapshot!(output, @r"
2220        root
2221         |-- table1.id: int32 (nullable = false)
2222         |-- name: utf8 (nullable = true)
2223         |-- table2.score: float64 (nullable = true)
2224         |-- active: boolean (nullable = false)
2225        ");
2226    }
2227
2228    #[test]
2229    fn test_print_schema_array_of_map() {
2230        // Test the specific example from user feedback: array of map
2231        let map_field = Field::new(
2232            "entries",
2233            DataType::Struct(Fields::from(vec![
2234                Field::new("key", DataType::Utf8, false),
2235                Field::new("value", DataType::Utf8, false),
2236            ])),
2237            false,
2238        );
2239
2240        let array_of_map_field = Field::new(
2241            "array_map_field",
2242            DataType::List(Arc::new(Field::new(
2243                "item",
2244                DataType::Map(Arc::new(map_field), false),
2245                false,
2246            ))),
2247            false,
2248        );
2249
2250        let schema = DFSchema::from_unqualified_fields(
2251            vec![array_of_map_field].into(),
2252            HashMap::new(),
2253        )
2254        .unwrap();
2255
2256        let output = schema.tree_string();
2257
2258        insta::assert_snapshot!(output, @r"
2259        root
2260         |-- array_map_field: list (nullable = false)
2261         |    |-- item: map (nullable = false)
2262         |    |    |-- key: utf8 (nullable = false)
2263         |    |    |-- value: utf8 (nullable = false)
2264        ");
2265    }
2266
2267    #[test]
2268    fn test_print_schema_complex_type_combinations() {
2269        // Test various combinations of list, struct, and map types
2270
2271        // List of structs
2272        let list_of_structs = Field::new(
2273            "list_of_structs",
2274            DataType::List(Arc::new(Field::new(
2275                "item",
2276                DataType::Struct(Fields::from(vec![
2277                    Field::new("id", DataType::Int32, false),
2278                    Field::new("name", DataType::Utf8, true),
2279                    Field::new("score", DataType::Float64, true),
2280                ])),
2281                true,
2282            ))),
2283            true,
2284        );
2285
2286        // Struct containing lists
2287        let struct_with_lists = Field::new(
2288            "struct_with_lists",
2289            DataType::Struct(Fields::from(vec![
2290                Field::new(
2291                    "tags",
2292                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2293                    true,
2294                ),
2295                Field::new(
2296                    "scores",
2297                    DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2298                    false,
2299                ),
2300                Field::new("metadata", DataType::Utf8, true),
2301            ])),
2302            false,
2303        );
2304
2305        // Map with struct values
2306        let map_with_struct_values = Field::new(
2307            "map_with_struct_values",
2308            DataType::Map(
2309                Arc::new(Field::new(
2310                    "entries",
2311                    DataType::Struct(Fields::from(vec![
2312                        Field::new("key", DataType::Utf8, false),
2313                        Field::new(
2314                            "value",
2315                            DataType::Struct(Fields::from(vec![
2316                                Field::new("count", DataType::Int64, false),
2317                                Field::new("active", DataType::Boolean, true),
2318                            ])),
2319                            true,
2320                        ),
2321                    ])),
2322                    false,
2323                )),
2324                false,
2325            ),
2326            true,
2327        );
2328
2329        // List of maps
2330        let list_of_maps = Field::new(
2331            "list_of_maps",
2332            DataType::List(Arc::new(Field::new(
2333                "item",
2334                DataType::Map(
2335                    Arc::new(Field::new(
2336                        "entries",
2337                        DataType::Struct(Fields::from(vec![
2338                            Field::new("key", DataType::Utf8, false),
2339                            Field::new("value", DataType::Int32, true),
2340                        ])),
2341                        false,
2342                    )),
2343                    false,
2344                ),
2345                true,
2346            ))),
2347            true,
2348        );
2349
2350        // Deeply nested: struct containing list of structs containing maps
2351        let deeply_nested = Field::new(
2352            "deeply_nested",
2353            DataType::Struct(Fields::from(vec![
2354                Field::new("level1", DataType::Utf8, true),
2355                Field::new(
2356                    "level2",
2357                    DataType::List(Arc::new(Field::new(
2358                        "item",
2359                        DataType::Struct(Fields::from(vec![
2360                            Field::new("id", DataType::Int32, false),
2361                            Field::new(
2362                                "properties",
2363                                DataType::Map(
2364                                    Arc::new(Field::new(
2365                                        "entries",
2366                                        DataType::Struct(Fields::from(vec![
2367                                            Field::new("key", DataType::Utf8, false),
2368                                            Field::new("value", DataType::Float64, true),
2369                                        ])),
2370                                        false,
2371                                    )),
2372                                    false,
2373                                ),
2374                                true,
2375                            ),
2376                        ])),
2377                        true,
2378                    ))),
2379                    false,
2380                ),
2381            ])),
2382            true,
2383        );
2384
2385        let schema = DFSchema::from_unqualified_fields(
2386            vec![
2387                list_of_structs,
2388                struct_with_lists,
2389                map_with_struct_values,
2390                list_of_maps,
2391                deeply_nested,
2392            ]
2393            .into(),
2394            HashMap::new(),
2395        )
2396        .unwrap();
2397
2398        let output = schema.tree_string();
2399
2400        insta::assert_snapshot!(output, @r"
2401        root
2402         |-- list_of_structs: list (nullable = true)
2403         |    |-- item: struct (nullable = true)
2404         |    |    |-- id: int32 (nullable = false)
2405         |    |    |-- name: utf8 (nullable = true)
2406         |    |    |-- score: float64 (nullable = true)
2407         |-- struct_with_lists: struct (nullable = false)
2408         |    |-- tags: list (nullable = true)
2409         |    |    |-- item: utf8 (nullable = true)
2410         |    |-- scores: list (nullable = false)
2411         |    |    |-- item: int32 (nullable = true)
2412         |    |-- metadata: utf8 (nullable = true)
2413         |-- map_with_struct_values: map (nullable = true)
2414         |    |-- key: utf8 (nullable = false)
2415         |    |-- value: struct (nullable = true)
2416         |    |    |-- count: int64 (nullable = false)
2417         |    |    |-- active: boolean (nullable = true)
2418         |-- list_of_maps: list (nullable = true)
2419         |    |-- item: map (nullable = true)
2420         |    |    |-- key: utf8 (nullable = false)
2421         |    |    |-- value: int32 (nullable = false)
2422         |-- deeply_nested: struct (nullable = true)
2423         |    |-- level1: utf8 (nullable = true)
2424         |    |-- level2: list (nullable = false)
2425         |    |    |-- item: struct (nullable = true)
2426         |    |    |    |-- id: int32 (nullable = false)
2427         |    |    |    |-- properties: map (nullable = true)
2428         |    |    |    |    |-- key: utf8 (nullable = false)
2429         |    |    |    |    |-- value: float64 (nullable = false)
2430        ");
2431    }
2432
2433    #[test]
2434    fn test_print_schema_edge_case_types() {
2435        // Test edge cases and special types
2436        let schema = DFSchema::from_unqualified_fields(
2437            vec![
2438                Field::new("null_field", DataType::Null, true),
2439                Field::new("binary_field", DataType::Binary, false),
2440                Field::new("large_binary", DataType::LargeBinary, true),
2441                Field::new("large_utf8", DataType::LargeUtf8, false),
2442                Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2443                Field::new(
2444                    "fixed_size_list",
2445                    DataType::FixedSizeList(
2446                        Arc::new(Field::new("item", DataType::Int32, true)),
2447                        5,
2448                    ),
2449                    false,
2450                ),
2451                Field::new("decimal32", DataType::Decimal32(9, 4), true),
2452                Field::new("decimal64", DataType::Decimal64(9, 4), true),
2453                Field::new("decimal128", DataType::Decimal128(18, 4), true),
2454                Field::new("decimal256", DataType::Decimal256(38, 10), false),
2455                Field::new("date32", DataType::Date32, true),
2456                Field::new("date64", DataType::Date64, false),
2457                Field::new(
2458                    "time32_seconds",
2459                    DataType::Time32(arrow::datatypes::TimeUnit::Second),
2460                    true,
2461                ),
2462                Field::new(
2463                    "time64_nanoseconds",
2464                    DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2465                    false,
2466                ),
2467            ]
2468            .into(),
2469            HashMap::new(),
2470        )
2471        .unwrap();
2472
2473        let output = schema.tree_string();
2474
2475        insta::assert_snapshot!(output, @r"
2476        root
2477         |-- null_field: null (nullable = true)
2478         |-- binary_field: binary (nullable = false)
2479         |-- large_binary: large_binary (nullable = true)
2480         |-- large_utf8: large_utf8 (nullable = false)
2481         |-- fixed_size_binary: fixed_size_binary (nullable = true)
2482         |-- fixed_size_list: fixed size list (nullable = false)
2483         |    |-- item: int32 (nullable = true)
2484         |-- decimal32: decimal32(9, 4) (nullable = true)
2485         |-- decimal64: decimal64(9, 4) (nullable = true)
2486         |-- decimal128: decimal128(18, 4) (nullable = true)
2487         |-- decimal256: decimal256(38, 10) (nullable = false)
2488         |-- date32: date32 (nullable = true)
2489         |-- date64: date64 (nullable = false)
2490         |-- time32_seconds: time32 (nullable = true)
2491         |-- time64_nanoseconds: time64 (nullable = false)
2492        ");
2493    }
2494}