datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use arrow::datatypes::{DataType, Field, Schema};
60/// use datafusion_common::{Column, DFSchema};
61///
62/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
63///
64/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
65/// let column = Column::from_qualified_name("t1.c1");
66/// assert!(df_schema.has_column(&column));
67///
68/// // Can also access qualified fields with unqualified name, if it's unambiguous
69/// let column = Column::from_qualified_name("c1");
70/// assert!(df_schema.has_column(&column));
71/// ```
72///
73/// # Creating unqualified schemas
74///
75/// Create an unqualified schema using TryFrom:
76///
77/// ```rust
78/// use arrow::datatypes::{DataType, Field, Schema};
79/// use datafusion_common::{Column, DFSchema};
80///
81/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
82///
83/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
84/// let column = Column::new_unqualified("c1");
85/// assert!(df_schema.has_column(&column));
86/// ```
87///
88/// # Converting back to Arrow schema
89///
90/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
91///
92/// ```rust
93/// use arrow::datatypes::{Field, Schema};
94/// use datafusion_common::DFSchema;
95/// use std::collections::HashMap;
96///
97/// let df_schema = DFSchema::from_unqualified_fields(
98///     vec![Field::new("c1", arrow::datatypes::DataType::Int32, false)].into(),
99///     HashMap::new(),
100/// )
101/// .unwrap();
102/// let schema: &Schema = df_schema.as_arrow();
103/// assert_eq!(schema.fields().len(), 1);
104/// ```
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct DFSchema {
107    /// Inner Arrow schema reference.
108    inner: SchemaRef,
109    /// Optional qualifiers for each column in this schema. In the same order as
110    /// the `self.inner.fields()`
111    field_qualifiers: Vec<Option<TableReference>>,
112    /// Stores functional dependencies in the schema.
113    functional_dependencies: FunctionalDependencies,
114}
115
116impl DFSchema {
117    /// Creates an empty `DFSchema`
118    pub fn empty() -> Self {
119        Self {
120            inner: Arc::new(Schema::new([])),
121            field_qualifiers: vec![],
122            functional_dependencies: FunctionalDependencies::empty(),
123        }
124    }
125
126    /// Return a reference to the inner Arrow [`Schema`]
127    ///
128    /// Note this does not have the qualifier information
129    pub fn as_arrow(&self) -> &Schema {
130        self.inner.as_ref()
131    }
132
133    /// Return a reference to the inner Arrow [`SchemaRef`]
134    ///
135    /// Note this does not have the qualifier information
136    pub fn inner(&self) -> &SchemaRef {
137        &self.inner
138    }
139
140    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
141    pub fn new_with_metadata(
142        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
143        metadata: HashMap<String, String>,
144    ) -> Result<Self> {
145        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
146            qualified_fields.into_iter().unzip();
147
148        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
149
150        let dfschema = Self {
151            inner: schema,
152            field_qualifiers: qualifiers,
153            functional_dependencies: FunctionalDependencies::empty(),
154        };
155        dfschema.check_names()?;
156        Ok(dfschema)
157    }
158
159    /// Create a new `DFSchema` from a list of Arrow [Field]s
160    pub fn from_unqualified_fields(
161        fields: Fields,
162        metadata: HashMap<String, String>,
163    ) -> Result<Self> {
164        let field_count = fields.len();
165        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
166        let dfschema = Self {
167            inner: schema,
168            field_qualifiers: vec![None; field_count],
169            functional_dependencies: FunctionalDependencies::empty(),
170        };
171        dfschema.check_names()?;
172        Ok(dfschema)
173    }
174
175    /// Create a `DFSchema` from an Arrow schema and a given qualifier
176    ///
177    /// To create a schema from an Arrow schema without a qualifier, use
178    /// `DFSchema::try_from`.
179    pub fn try_from_qualified_schema(
180        qualifier: impl Into<TableReference>,
181        schema: &Schema,
182    ) -> Result<Self> {
183        let qualifier = qualifier.into();
184        let schema = DFSchema {
185            inner: schema.clone().into(),
186            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
187            functional_dependencies: FunctionalDependencies::empty(),
188        };
189        schema.check_names()?;
190        Ok(schema)
191    }
192
193    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
194    pub fn from_field_specific_qualified_schema(
195        qualifiers: Vec<Option<TableReference>>,
196        schema: &SchemaRef,
197    ) -> Result<Self> {
198        let dfschema = Self {
199            inner: Arc::clone(schema),
200            field_qualifiers: qualifiers,
201            functional_dependencies: FunctionalDependencies::empty(),
202        };
203        dfschema.check_names()?;
204        Ok(dfschema)
205    }
206
207    /// Return the same schema, where all fields have a given qualifier.
208    pub fn with_field_specific_qualified_schema(
209        &self,
210        qualifiers: Vec<Option<TableReference>>,
211    ) -> Result<Self> {
212        if qualifiers.len() != self.fields().len() {
213            return _plan_err!(
214                "Number of qualifiers must match number of fields. Expected {}, got {}",
215                self.fields().len(),
216                qualifiers.len()
217            );
218        }
219        Ok(DFSchema {
220            inner: Arc::clone(&self.inner),
221            field_qualifiers: qualifiers,
222            functional_dependencies: self.functional_dependencies.clone(),
223        })
224    }
225
226    /// Check if the schema have some fields with the same name
227    pub fn check_names(&self) -> Result<()> {
228        let mut qualified_names = BTreeSet::new();
229        let mut unqualified_names = BTreeSet::new();
230
231        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
232            if let Some(qualifier) = qualifier {
233                if !qualified_names.insert((qualifier, field.name())) {
234                    return _schema_err!(SchemaError::DuplicateQualifiedField {
235                        qualifier: Box::new(qualifier.clone()),
236                        name: field.name().to_string(),
237                    });
238                }
239            } else if !unqualified_names.insert(field.name()) {
240                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
241                    name: field.name().to_string()
242                });
243            }
244        }
245
246        for (qualifier, name) in qualified_names {
247            if unqualified_names.contains(name) {
248                return _schema_err!(SchemaError::AmbiguousReference {
249                    field: Box::new(Column::new(Some(qualifier.clone()), name))
250                });
251            }
252        }
253        Ok(())
254    }
255
256    /// Assigns functional dependencies.
257    pub fn with_functional_dependencies(
258        mut self,
259        functional_dependencies: FunctionalDependencies,
260    ) -> Result<Self> {
261        if functional_dependencies.is_valid(self.inner.fields.len()) {
262            self.functional_dependencies = functional_dependencies;
263            Ok(self)
264        } else {
265            _plan_err!(
266                "Invalid functional dependency: {:?}",
267                functional_dependencies
268            )
269        }
270    }
271
272    /// Create a new schema that contains the fields from this schema followed by the fields
273    /// from the supplied schema. An error will be returned if there are duplicate field names.
274    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
275        let mut schema_builder = SchemaBuilder::new();
276        schema_builder.extend(self.inner.fields().iter().cloned());
277        schema_builder.extend(schema.fields().iter().cloned());
278        let new_schema = schema_builder.finish();
279
280        let mut new_metadata = self.inner.metadata.clone();
281        new_metadata.extend(schema.inner.metadata.clone());
282        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
283
284        let mut new_qualifiers = self.field_qualifiers.clone();
285        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
286
287        let new_self = Self {
288            inner: Arc::new(new_schema_with_metadata),
289            field_qualifiers: new_qualifiers,
290            functional_dependencies: FunctionalDependencies::empty(),
291        };
292        new_self.check_names()?;
293        Ok(new_self)
294    }
295
296    /// Modify this schema by appending the fields from the supplied schema, ignoring any
297    /// duplicate fields.
298    ///
299    /// ## Merge Precedence
300    ///
301    /// **Schema-level metadata**: Metadata from both schemas is merged.
302    /// If both schemas have the same metadata key, the value from the `other_schema` parameter takes precedence.
303    ///
304    /// **Field-level merging**: Only non-duplicate fields are added. This means that the
305    /// `self` fields will always take precedence over the `other_schema` fields.
306    /// Duplicate field detection is based on:
307    /// - For qualified fields: both qualifier and field name must match
308    /// - For unqualified fields: only field name needs to match
309    ///
310    /// Take note how the precedence for fields & metadata merging differs;
311    /// merging prefers fields from `self` but prefers metadata from `other_schema`.
312    pub fn merge(&mut self, other_schema: &DFSchema) {
313        if other_schema.inner.fields.is_empty() {
314            return;
315        }
316
317        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
318            self.iter().collect();
319        let self_unqualified_names: HashSet<&str> = self
320            .inner
321            .fields
322            .iter()
323            .map(|field| field.name().as_str())
324            .collect();
325
326        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
327        let mut qualifiers = Vec::new();
328        for (qualifier, field) in other_schema.iter() {
329            // skip duplicate columns
330            let duplicated_field = match qualifier {
331                Some(q) => self_fields.contains(&(Some(q), field)),
332                // for unqualified columns, check as unqualified name
333                None => self_unqualified_names.contains(field.name().as_str()),
334            };
335            if !duplicated_field {
336                schema_builder.push(Arc::clone(field));
337                qualifiers.push(qualifier.cloned());
338            }
339        }
340        let mut metadata = self.inner.metadata.clone();
341        metadata.extend(other_schema.inner.metadata.clone());
342
343        let finished = schema_builder.finish();
344        let finished_with_metadata = finished.with_metadata(metadata);
345        self.inner = finished_with_metadata.into();
346        self.field_qualifiers.extend(qualifiers);
347    }
348
349    /// Get a list of fields
350    pub fn fields(&self) -> &Fields {
351        &self.inner.fields
352    }
353
354    /// Returns an immutable reference of a specific `Field` instance selected using an
355    /// offset within the internal `fields` vector
356    pub fn field(&self, i: usize) -> &Field {
357        &self.inner.fields[i]
358    }
359
360    /// Returns an immutable reference of a specific `Field` instance selected using an
361    /// offset within the internal `fields` vector and its qualifier
362    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
363        (self.field_qualifiers[i].as_ref(), self.field(i))
364    }
365
366    pub fn index_of_column_by_name(
367        &self,
368        qualifier: Option<&TableReference>,
369        name: &str,
370    ) -> Option<usize> {
371        let mut matches = self
372            .iter()
373            .enumerate()
374            .filter(|(_, (q, f))| match (qualifier, q) {
375                // field to lookup is qualified.
376                // current field is qualified and not shared between relations, compare both
377                // qualifier and name.
378                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
379                // field to lookup is qualified but current field is unqualified.
380                (Some(_), None) => false,
381                // field to lookup is unqualified, no need to compare qualifier
382                (None, Some(_)) | (None, None) => f.name() == name,
383            })
384            .map(|(idx, _)| idx);
385        matches.next()
386    }
387
388    /// Find the index of the column with the given qualifier and name,
389    /// returning `None` if not found
390    ///
391    /// See [Self::index_of_column] for a version that returns an error if the
392    /// column is not found
393    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
394        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
395    }
396
397    /// Find the index of the column with the given qualifier and name,
398    /// returning `Err` if not found
399    ///
400    /// See [Self::maybe_index_of_column] for a version that returns `None` if
401    /// the column is not found
402    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
403        self.maybe_index_of_column(col)
404            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
405    }
406
407    /// Check if the column is in the current schema
408    pub fn is_column_from_schema(&self, col: &Column) -> bool {
409        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
410            .is_some()
411    }
412
413    /// Find the field with the given name
414    pub fn field_with_name(
415        &self,
416        qualifier: Option<&TableReference>,
417        name: &str,
418    ) -> Result<&Field> {
419        if let Some(qualifier) = qualifier {
420            self.field_with_qualified_name(qualifier, name)
421        } else {
422            self.field_with_unqualified_name(name)
423        }
424    }
425
426    /// Find the qualified field with the given name
427    pub fn qualified_field_with_name(
428        &self,
429        qualifier: Option<&TableReference>,
430        name: &str,
431    ) -> Result<(Option<&TableReference>, &Field)> {
432        if let Some(qualifier) = qualifier {
433            let idx = self
434                .index_of_column_by_name(Some(qualifier), name)
435                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
436            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
437        } else {
438            self.qualified_field_with_unqualified_name(name)
439        }
440    }
441
442    /// Find all fields having the given qualifier
443    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
444        self.iter()
445            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
446            .map(|(_, f)| f.as_ref())
447            .collect()
448    }
449
450    /// Find all fields indices having the given qualifier
451    pub fn fields_indices_with_qualified(
452        &self,
453        qualifier: &TableReference,
454    ) -> Vec<usize> {
455        self.iter()
456            .enumerate()
457            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
458            .collect()
459    }
460
461    /// Find all fields that match the given name
462    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
463        self.fields()
464            .iter()
465            .filter(|field| field.name() == name)
466            .map(|f| f.as_ref())
467            .collect()
468    }
469
470    /// Find all fields that match the given name and return them with their qualifier
471    pub fn qualified_fields_with_unqualified_name(
472        &self,
473        name: &str,
474    ) -> Vec<(Option<&TableReference>, &Field)> {
475        self.iter()
476            .filter(|(_, field)| field.name() == name)
477            .map(|(qualifier, field)| (qualifier, field.as_ref()))
478            .collect()
479    }
480
481    /// Find all fields that match the given name and convert to column
482    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
483        self.iter()
484            .filter(|(_, field)| field.name() == name)
485            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
486            .collect()
487    }
488
489    /// Return all `Column`s for the schema
490    pub fn columns(&self) -> Vec<Column> {
491        self.iter()
492            .map(|(qualifier, field)| {
493                Column::new(qualifier.cloned(), field.name().clone())
494            })
495            .collect()
496    }
497
498    /// Find the qualified field with the given unqualified name
499    pub fn qualified_field_with_unqualified_name(
500        &self,
501        name: &str,
502    ) -> Result<(Option<&TableReference>, &Field)> {
503        let matches = self.qualified_fields_with_unqualified_name(name);
504        match matches.len() {
505            0 => Err(unqualified_field_not_found(name, self)),
506            1 => Ok((matches[0].0, matches[0].1)),
507            _ => {
508                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
509                // Because name may generate from Alias/... . It means that it don't own qualifier.
510                // For example:
511                //             Join on id = b.id
512                // Project a.id as id   TableScan b id
513                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
514                // one field without qualifier, we should return it.
515                let fields_without_qualifier = matches
516                    .iter()
517                    .filter(|(q, _)| q.is_none())
518                    .collect::<Vec<_>>();
519                if fields_without_qualifier.len() == 1 {
520                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
521                } else {
522                    _schema_err!(SchemaError::AmbiguousReference {
523                        field: Box::new(Column::new_unqualified(name.to_string()))
524                    })
525                }
526            }
527        }
528    }
529
530    /// Find the field with the given name
531    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
532        self.qualified_field_with_unqualified_name(name)
533            .map(|(_, field)| field)
534    }
535
536    /// Find the field with the given qualified name
537    pub fn field_with_qualified_name(
538        &self,
539        qualifier: &TableReference,
540        name: &str,
541    ) -> Result<&Field> {
542        let idx = self
543            .index_of_column_by_name(Some(qualifier), name)
544            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
545
546        Ok(self.field(idx))
547    }
548
549    /// Find the field with the given qualified column
550    pub fn qualified_field_from_column(
551        &self,
552        column: &Column,
553    ) -> Result<(Option<&TableReference>, &Field)> {
554        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
555    }
556
557    /// Find if the field exists with the given name
558    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
559        self.fields().iter().any(|field| field.name() == name)
560    }
561
562    /// Find if the field exists with the given qualified name
563    pub fn has_column_with_qualified_name(
564        &self,
565        qualifier: &TableReference,
566        name: &str,
567    ) -> bool {
568        self.iter()
569            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
570    }
571
572    /// Find if the field exists with the given qualified column
573    pub fn has_column(&self, column: &Column) -> bool {
574        match &column.relation {
575            Some(r) => self.has_column_with_qualified_name(r, &column.name),
576            None => self.has_column_with_unqualified_name(&column.name),
577        }
578    }
579
580    /// Check to see if unqualified field names matches field names in Arrow schema
581    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
582        self.inner
583            .fields
584            .iter()
585            .zip(arrow_schema.fields().iter())
586            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
587    }
588
589    /// Check to see if fields in 2 Arrow schemas are compatible
590    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
591    pub fn check_arrow_schema_type_compatible(
592        &self,
593        arrow_schema: &Schema,
594    ) -> Result<()> {
595        let self_arrow_schema = self.as_arrow();
596        self_arrow_schema
597            .fields()
598            .iter()
599            .zip(arrow_schema.fields().iter())
600            .try_for_each(|(l_field, r_field)| {
601                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
602                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
603                                r_field.name(),
604                                r_field.data_type(),
605                                l_field.name(),
606                                l_field.data_type())
607                } else {
608                    Ok(())
609                }
610            })
611    }
612
613    /// Returns true if the two schemas have the same qualified named
614    /// fields with logically equivalent data types. Returns false otherwise.
615    ///
616    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
617    /// equivalence checking.
618    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
619        if self.fields().len() != other.fields().len() {
620            return false;
621        }
622        let self_fields = self.iter();
623        let other_fields = other.iter();
624        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
625            q1 == q2
626                && f1.name() == f2.name()
627                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
628        })
629    }
630
631    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
632    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
633        self.has_equivalent_names_and_types(other).is_ok()
634    }
635
636    /// Returns Ok if the two schemas have the same qualified named
637    /// fields with the compatible data types.
638    ///
639    /// Returns an `Err` with a message otherwise.
640    ///
641    /// This is a specialized version of Eq that ignores differences in
642    /// nullability and metadata.
643    ///
644    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
645    /// logical type checking, which for example would consider a dictionary
646    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
647    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
648        // case 1 : schema length mismatch
649        if self.fields().len() != other.fields().len() {
650            _plan_err!(
651                "Schema mismatch: the schema length are not same \
652            Expected schema length: {}, got: {}",
653                self.fields().len(),
654                other.fields().len()
655            )
656        } else {
657            // case 2 : schema length match, but fields mismatch
658            // check if the fields name are the same and have the same data types
659            self.fields()
660                .iter()
661                .zip(other.fields().iter())
662                .try_for_each(|(f1, f2)| {
663                    if f1.name() != f2.name()
664                        || (!DFSchema::datatype_is_semantically_equal(
665                            f1.data_type(),
666                            f2.data_type(),
667                        ))
668                    {
669                        _plan_err!(
670                            "Schema mismatch: Expected field '{}' with type {}, \
671                            but got '{}' with type {}.",
672                            f1.name(),
673                            f1.data_type(),
674                            f2.name(),
675                            f2.data_type()
676                        )
677                    } else {
678                        Ok(())
679                    }
680                })
681        }
682    }
683
684    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
685    /// than datatype_is_semantically_equal in that different representations of same data can be
686    /// logically but not semantically equivalent. Semantically equivalent types are always also
687    /// logically equivalent. For example:
688    /// - a Dictionary<K,V> type is logically equal to a plain V type
689    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
690    /// - Utf8 and Utf8View are logically equal
691    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
692        // check nested fields
693        match (dt1, dt2) {
694            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
695                v1.as_ref() == v2.as_ref()
696            }
697            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
698            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
699            (DataType::List(f1), DataType::List(f2))
700            | (DataType::LargeList(f1), DataType::LargeList(f2))
701            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
702                // Don't compare the names of the technical inner field
703                // Usually "item" but that's not mandated
704                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
705            }
706            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
707                // Don't compare the names of the technical inner fields
708                // Usually "entries", "key", "value" but that's not mandated
709                match (f1.data_type(), f2.data_type()) {
710                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
711                        f1_inner.len() == f2_inner.len()
712                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
713                                Self::datatype_is_logically_equal(
714                                    f1.data_type(),
715                                    f2.data_type(),
716                                )
717                            })
718                    }
719                    _ => panic!("Map type should have an inner struct field"),
720                }
721            }
722            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
723                let iter1 = fields1.iter();
724                let iter2 = fields2.iter();
725                fields1.len() == fields2.len() &&
726                        // all fields have to be the same
727                    iter1
728                    .zip(iter2)
729                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
730            }
731            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
732                let iter1 = fields1.iter();
733                let iter2 = fields2.iter();
734                fields1.len() == fields2.len() &&
735                    // all fields have to be the same
736                    iter1
737                        .zip(iter2)
738                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
739            }
740            // Utf8 and Utf8View are logically equivalent
741            (DataType::Utf8, DataType::Utf8View) => true,
742            (DataType::Utf8View, DataType::Utf8) => true,
743            _ => Self::datatype_is_semantically_equal(dt1, dt2),
744        }
745    }
746
747    /// Returns true of two [`DataType`]s are semantically equal (same
748    /// name and type), ignoring both metadata and nullability, decimal precision/scale,
749    /// and timezone time units/timezones.
750    ///
751    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
752    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
753        // check nested fields
754        match (dt1, dt2) {
755            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
756                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
757                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
758            }
759            (DataType::List(f1), DataType::List(f2))
760            | (DataType::LargeList(f1), DataType::LargeList(f2))
761            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
762                // Don't compare the names of the technical inner field
763                // Usually "item" but that's not mandated
764                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
765            }
766            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
767                // Don't compare the names of the technical inner fields
768                // Usually "entries", "key", "value" but that's not mandated
769                match (f1.data_type(), f2.data_type()) {
770                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
771                        f1_inner.len() == f2_inner.len()
772                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
773                                Self::datatype_is_semantically_equal(
774                                    f1.data_type(),
775                                    f2.data_type(),
776                                )
777                            })
778                    }
779                    _ => panic!("Map type should have an inner struct field"),
780                }
781            }
782            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
783                let iter1 = fields1.iter();
784                let iter2 = fields2.iter();
785                fields1.len() == fields2.len() &&
786                        // all fields have to be the same
787                    iter1
788                    .zip(iter2)
789                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
790            }
791            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
792                let iter1 = fields1.iter();
793                let iter2 = fields2.iter();
794                fields1.len() == fields2.len() &&
795                    // all fields have to be the same
796                    iter1
797                        .zip(iter2)
798                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
799            }
800            (
801                DataType::Decimal32(_l_precision, _l_scale),
802                DataType::Decimal32(_r_precision, _r_scale),
803            ) => true,
804            (
805                DataType::Decimal64(_l_precision, _l_scale),
806                DataType::Decimal64(_r_precision, _r_scale),
807            ) => true,
808            (
809                DataType::Decimal128(_l_precision, _l_scale),
810                DataType::Decimal128(_r_precision, _r_scale),
811            ) => true,
812            (
813                DataType::Decimal256(_l_precision, _l_scale),
814                DataType::Decimal256(_r_precision, _r_scale),
815            ) => true,
816            (
817                DataType::Timestamp(_l_time_unit, _l_timezone),
818                DataType::Timestamp(_r_time_unit, _r_timezone),
819            ) => true,
820            _ => dt1 == dt2,
821        }
822    }
823
824    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
825        f1.name() == f2.name()
826            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
827    }
828
829    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
830        f1.name() == f2.name()
831            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
832    }
833
834    /// Strip all field qualifier in schema
835    pub fn strip_qualifiers(self) -> Self {
836        DFSchema {
837            field_qualifiers: vec![None; self.inner.fields.len()],
838            inner: self.inner,
839            functional_dependencies: self.functional_dependencies,
840        }
841    }
842
843    /// Replace all field qualifier with new value in schema
844    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
845        let qualifier = qualifier.into();
846        DFSchema {
847            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
848            inner: self.inner,
849            functional_dependencies: self.functional_dependencies,
850        }
851    }
852
853    /// Get list of fully-qualified field names in this schema
854    pub fn field_names(&self) -> Vec<String> {
855        self.iter()
856            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
857            .collect::<Vec<_>>()
858    }
859
860    /// Get metadata of this schema
861    pub fn metadata(&self) -> &HashMap<String, String> {
862        &self.inner.metadata
863    }
864
865    /// Get functional dependencies
866    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
867        &self.functional_dependencies
868    }
869
870    /// Iterate over the qualifiers and fields in the DFSchema
871    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
872        self.field_qualifiers
873            .iter()
874            .zip(self.inner.fields().iter())
875            .map(|(qualifier, field)| (qualifier.as_ref(), field))
876    }
877    /// Returns a tree-like string representation of the schema.
878    ///
879    /// This method formats the schema
880    /// with a tree-like structure showing field names, types, and nullability.
881    ///
882    /// # Example
883    ///
884    /// ```
885    /// use arrow::datatypes::{DataType, Field, Schema};
886    /// use datafusion_common::DFSchema;
887    /// use std::collections::HashMap;
888    ///
889    /// let schema = DFSchema::from_unqualified_fields(
890    ///     vec![
891    ///         Field::new("id", DataType::Int32, false),
892    ///         Field::new("name", DataType::Utf8, true),
893    ///     ]
894    ///     .into(),
895    ///     HashMap::new(),
896    /// )
897    /// .unwrap();
898    ///
899    /// assert_eq!(
900    ///     schema.tree_string().to_string(),
901    ///     r#"root
902    ///  |-- id: int32 (nullable = false)
903    ///  |-- name: utf8 (nullable = true)"#
904    /// );
905    /// ```
906    pub fn tree_string(&self) -> impl Display + '_ {
907        let mut result = String::from("root\n");
908
909        for (qualifier, field) in self.iter() {
910            let field_name = match qualifier {
911                Some(q) => format!("{}.{}", q, field.name()),
912                None => field.name().to_string(),
913            };
914
915            format_field_with_indent(
916                &mut result,
917                &field_name,
918                field.data_type(),
919                field.is_nullable(),
920                " ",
921            );
922        }
923
924        // Remove the trailing newline
925        if result.ends_with('\n') {
926            result.pop();
927        }
928
929        result
930    }
931}
932
933/// Format field with proper nested indentation for complex types
934fn format_field_with_indent(
935    result: &mut String,
936    field_name: &str,
937    data_type: &DataType,
938    nullable: bool,
939    indent: &str,
940) {
941    let nullable_str = nullable.to_string().to_lowercase();
942    let child_indent = format!("{indent}|    ");
943
944    match data_type {
945        DataType::List(field) => {
946            result.push_str(&format!(
947                "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
948            ));
949            format_field_with_indent(
950                result,
951                field.name(),
952                field.data_type(),
953                field.is_nullable(),
954                &child_indent,
955            );
956        }
957        DataType::LargeList(field) => {
958            result.push_str(&format!(
959                "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
960            ));
961            format_field_with_indent(
962                result,
963                field.name(),
964                field.data_type(),
965                field.is_nullable(),
966                &child_indent,
967            );
968        }
969        DataType::FixedSizeList(field, _size) => {
970            result.push_str(&format!(
971                "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
972            ));
973            format_field_with_indent(
974                result,
975                field.name(),
976                field.data_type(),
977                field.is_nullable(),
978                &child_indent,
979            );
980        }
981        DataType::Map(field, _) => {
982            result.push_str(&format!(
983                "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
984            ));
985            if let DataType::Struct(inner_fields) = field.data_type() {
986                if inner_fields.len() == 2 {
987                    format_field_with_indent(
988                        result,
989                        "key",
990                        inner_fields[0].data_type(),
991                        inner_fields[0].is_nullable(),
992                        &child_indent,
993                    );
994                    let value_contains_null =
995                        field.is_nullable().to_string().to_lowercase();
996                    // Handle complex value types properly
997                    match inner_fields[1].data_type() {
998                        DataType::Struct(_)
999                        | DataType::List(_)
1000                        | DataType::LargeList(_)
1001                        | DataType::FixedSizeList(_, _)
1002                        | DataType::Map(_, _) => {
1003                            format_field_with_indent(
1004                                result,
1005                                "value",
1006                                inner_fields[1].data_type(),
1007                                inner_fields[1].is_nullable(),
1008                                &child_indent,
1009                            );
1010                        }
1011                        _ => {
1012                            result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1013                                format_simple_data_type(inner_fields[1].data_type())));
1014                        }
1015                    }
1016                }
1017            }
1018        }
1019        DataType::Struct(fields) => {
1020            result.push_str(&format!(
1021                "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1022            ));
1023            for struct_field in fields {
1024                format_field_with_indent(
1025                    result,
1026                    struct_field.name(),
1027                    struct_field.data_type(),
1028                    struct_field.is_nullable(),
1029                    &child_indent,
1030                );
1031            }
1032        }
1033        _ => {
1034            let type_str = format_simple_data_type(data_type);
1035            result.push_str(&format!(
1036                "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1037            ));
1038        }
1039    }
1040}
1041
1042/// Format simple DataType in lowercase format (for leaf nodes)
1043fn format_simple_data_type(data_type: &DataType) -> String {
1044    match data_type {
1045        DataType::Boolean => "boolean".to_string(),
1046        DataType::Int8 => "int8".to_string(),
1047        DataType::Int16 => "int16".to_string(),
1048        DataType::Int32 => "int32".to_string(),
1049        DataType::Int64 => "int64".to_string(),
1050        DataType::UInt8 => "uint8".to_string(),
1051        DataType::UInt16 => "uint16".to_string(),
1052        DataType::UInt32 => "uint32".to_string(),
1053        DataType::UInt64 => "uint64".to_string(),
1054        DataType::Float16 => "float16".to_string(),
1055        DataType::Float32 => "float32".to_string(),
1056        DataType::Float64 => "float64".to_string(),
1057        DataType::Utf8 => "utf8".to_string(),
1058        DataType::LargeUtf8 => "large_utf8".to_string(),
1059        DataType::Binary => "binary".to_string(),
1060        DataType::LargeBinary => "large_binary".to_string(),
1061        DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1062        DataType::Date32 => "date32".to_string(),
1063        DataType::Date64 => "date64".to_string(),
1064        DataType::Time32(_) => "time32".to_string(),
1065        DataType::Time64(_) => "time64".to_string(),
1066        DataType::Timestamp(_, tz) => match tz {
1067            Some(tz_str) => format!("timestamp ({tz_str})"),
1068            None => "timestamp".to_string(),
1069        },
1070        DataType::Interval(_) => "interval".to_string(),
1071        DataType::Dictionary(_, value_type) => {
1072            format_simple_data_type(value_type.as_ref())
1073        }
1074        DataType::Decimal32(precision, scale) => {
1075            format!("decimal32({precision}, {scale})")
1076        }
1077        DataType::Decimal64(precision, scale) => {
1078            format!("decimal64({precision}, {scale})")
1079        }
1080        DataType::Decimal128(precision, scale) => {
1081            format!("decimal128({precision}, {scale})")
1082        }
1083        DataType::Decimal256(precision, scale) => {
1084            format!("decimal256({precision}, {scale})")
1085        }
1086        DataType::Null => "null".to_string(),
1087        _ => format!("{data_type}").to_lowercase(),
1088    }
1089}
1090
1091/// Allow DFSchema to be converted into an Arrow `&Schema`
1092impl AsRef<Schema> for DFSchema {
1093    fn as_ref(&self) -> &Schema {
1094        self.as_arrow()
1095    }
1096}
1097
1098/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
1099/// example)
1100impl AsRef<SchemaRef> for DFSchema {
1101    fn as_ref(&self) -> &SchemaRef {
1102        self.inner()
1103    }
1104}
1105
1106/// Create a `DFSchema` from an Arrow schema
1107impl TryFrom<Schema> for DFSchema {
1108    type Error = DataFusionError;
1109    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1110        Self::try_from(Arc::new(schema))
1111    }
1112}
1113
1114impl TryFrom<SchemaRef> for DFSchema {
1115    type Error = DataFusionError;
1116    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1117        let field_count = schema.fields.len();
1118        let dfschema = Self {
1119            inner: schema,
1120            field_qualifiers: vec![None; field_count],
1121            functional_dependencies: FunctionalDependencies::empty(),
1122        };
1123        // Without checking names, because schema here may have duplicate field names.
1124        // For example, Partial AggregateMode will generate duplicate field names from
1125        // state_fields.
1126        // See <https://github.com/apache/datafusion/issues/17715>
1127        // dfschema.check_names()?;
1128        Ok(dfschema)
1129    }
1130}
1131
1132// Hashing refers to a subset of fields considered in PartialEq.
1133impl Hash for DFSchema {
1134    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1135        self.inner.fields.hash(state);
1136        self.inner.metadata.len().hash(state); // HashMap is not hashable
1137    }
1138}
1139
1140/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
1141pub trait ToDFSchema
1142where
1143    Self: Sized,
1144{
1145    /// Attempt to create a DSSchema
1146    fn to_dfschema(self) -> Result<DFSchema>;
1147
1148    /// Attempt to create a DSSchemaRef
1149    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1150        Ok(Arc::new(self.to_dfschema()?))
1151    }
1152}
1153
1154impl ToDFSchema for Schema {
1155    fn to_dfschema(self) -> Result<DFSchema> {
1156        DFSchema::try_from(self)
1157    }
1158}
1159
1160impl ToDFSchema for SchemaRef {
1161    fn to_dfschema(self) -> Result<DFSchema> {
1162        DFSchema::try_from(self)
1163    }
1164}
1165
1166impl ToDFSchema for Vec<Field> {
1167    fn to_dfschema(self) -> Result<DFSchema> {
1168        let field_count = self.len();
1169        let schema = Schema {
1170            fields: self.into(),
1171            metadata: HashMap::new(),
1172        };
1173        let dfschema = DFSchema {
1174            inner: schema.into(),
1175            field_qualifiers: vec![None; field_count],
1176            functional_dependencies: FunctionalDependencies::empty(),
1177        };
1178        Ok(dfschema)
1179    }
1180}
1181
1182impl Display for DFSchema {
1183    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1184        write!(
1185            f,
1186            "fields:[{}], metadata:{:?}",
1187            self.iter()
1188                .map(|(q, f)| qualified_name(q, f.name()))
1189                .collect::<Vec<String>>()
1190                .join(", "),
1191            self.inner.metadata
1192        )
1193    }
1194}
1195
1196/// Provides schema information needed by certain methods of `Expr`
1197/// (defined in the datafusion-common crate).
1198///
1199/// Note that this trait is implemented for &[DFSchema] which is
1200/// widely used in the DataFusion codebase.
1201pub trait ExprSchema: std::fmt::Debug {
1202    /// Is this column reference nullable?
1203    fn nullable(&self, col: &Column) -> Result<bool> {
1204        Ok(self.field_from_column(col)?.is_nullable())
1205    }
1206
1207    /// What is the datatype of this column?
1208    fn data_type(&self, col: &Column) -> Result<&DataType> {
1209        Ok(self.field_from_column(col)?.data_type())
1210    }
1211
1212    /// Returns the column's optional metadata.
1213    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1214        Ok(self.field_from_column(col)?.metadata())
1215    }
1216
1217    /// Return the column's datatype and nullability
1218    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1219        let field = self.field_from_column(col)?;
1220        Ok((field.data_type(), field.is_nullable()))
1221    }
1222
1223    // Return the column's field
1224    fn field_from_column(&self, col: &Column) -> Result<&Field>;
1225}
1226
1227// Implement `ExprSchema` for `Arc<DFSchema>`
1228impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1229    fn nullable(&self, col: &Column) -> Result<bool> {
1230        self.as_ref().nullable(col)
1231    }
1232
1233    fn data_type(&self, col: &Column) -> Result<&DataType> {
1234        self.as_ref().data_type(col)
1235    }
1236
1237    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1238        ExprSchema::metadata(self.as_ref(), col)
1239    }
1240
1241    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1242        self.as_ref().data_type_and_nullable(col)
1243    }
1244
1245    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1246        self.as_ref().field_from_column(col)
1247    }
1248}
1249
1250impl ExprSchema for DFSchema {
1251    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1252        match &col.relation {
1253            Some(r) => self.field_with_qualified_name(r, &col.name),
1254            None => self.field_with_unqualified_name(&col.name),
1255        }
1256    }
1257}
1258
1259/// DataFusion-specific extensions to [`Schema`].
1260pub trait SchemaExt {
1261    /// This is a specialized version of Eq that ignores differences
1262    /// in nullability and metadata.
1263    ///
1264    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1265    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1266
1267    /// Returns nothing if the two schemas have the same qualified named
1268    /// fields with logically equivalent data types. Returns internal error otherwise.
1269    ///
1270    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1271    /// equivalence checking.
1272    ///
1273    /// It is only used by insert into cases.
1274    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1275}
1276
1277impl SchemaExt for Schema {
1278    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1279        if self.fields().len() != other.fields().len() {
1280            return false;
1281        }
1282
1283        self.fields()
1284            .iter()
1285            .zip(other.fields().iter())
1286            .all(|(f1, f2)| {
1287                f1.name() == f2.name()
1288                    && DFSchema::datatype_is_semantically_equal(
1289                        f1.data_type(),
1290                        f2.data_type(),
1291                    )
1292            })
1293    }
1294
1295    // It is only used by insert into cases.
1296    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1297        // case 1 : schema length mismatch
1298        if self.fields().len() != other.fields().len() {
1299            _plan_err!(
1300                "Inserting query must have the same schema length as the table. \
1301            Expected table schema length: {}, got: {}",
1302                self.fields().len(),
1303                other.fields().len()
1304            )
1305        } else {
1306            // case 2 : schema length match, but fields mismatch
1307            // check if the fields name are the same and have the same data types
1308            self.fields()
1309                .iter()
1310                .zip(other.fields().iter())
1311                .try_for_each(|(f1, f2)| {
1312                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1313                        _plan_err!(
1314                            "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1315                            but got '{}' with type {}.",
1316                            f1.name(),
1317                            f1.data_type(),
1318                            f2.name(),
1319                            f2.data_type())
1320                    } else {
1321                        Ok(())
1322                    }
1323                })
1324        }
1325    }
1326}
1327
1328pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1329    match qualifier {
1330        Some(q) => format!("{q}.{name}"),
1331        None => name.to_string(),
1332    }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337    use crate::assert_contains;
1338
1339    use super::*;
1340
1341    #[test]
1342    fn qualifier_in_name() -> Result<()> {
1343        let col = Column::from_name("t1.c0");
1344        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1345        // lookup with unqualified name "t1.c0"
1346        let err = schema.index_of_column(&col).unwrap_err();
1347        let expected = "Schema error: No field named \"t1.c0\". \
1348            Column names are case sensitive. \
1349            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1350            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1351            Did you mean 't1.c0'?.";
1352        assert_eq!(err.strip_backtrace(), expected);
1353        Ok(())
1354    }
1355
1356    #[test]
1357    fn quoted_qualifiers_in_name() -> Result<()> {
1358        let col = Column::from_name("t1.c0");
1359        let schema = DFSchema::try_from_qualified_schema(
1360            "t1",
1361            &Schema::new(vec![
1362                Field::new("CapitalColumn", DataType::Boolean, true),
1363                Field::new("field.with.period", DataType::Boolean, true),
1364            ]),
1365        )?;
1366
1367        // lookup with unqualified name "t1.c0"
1368        let err = schema.index_of_column(&col).unwrap_err();
1369        let expected = "Schema error: No field named \"t1.c0\". \
1370            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1371        assert_eq!(err.strip_backtrace(), expected);
1372        Ok(())
1373    }
1374
1375    #[test]
1376    fn from_unqualified_schema() -> Result<()> {
1377        let schema = DFSchema::try_from(test_schema_1())?;
1378        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1379        Ok(())
1380    }
1381
1382    #[test]
1383    fn from_qualified_schema() -> Result<()> {
1384        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1385        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1386        Ok(())
1387    }
1388
1389    #[test]
1390    fn test_from_field_specific_qualified_schema() -> Result<()> {
1391        let schema = DFSchema::from_field_specific_qualified_schema(
1392            vec![Some("t1".into()), None],
1393            &Arc::new(Schema::new(vec![
1394                Field::new("c0", DataType::Boolean, true),
1395                Field::new("c1", DataType::Boolean, true),
1396            ])),
1397        )?;
1398        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1399        Ok(())
1400    }
1401
1402    #[test]
1403    fn test_from_qualified_fields() -> Result<()> {
1404        let schema = DFSchema::new_with_metadata(
1405            vec![
1406                (
1407                    Some("t0".into()),
1408                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1409                ),
1410                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1411            ],
1412            HashMap::new(),
1413        )?;
1414        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1415        Ok(())
1416    }
1417
1418    #[test]
1419    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1420        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1421        let arrow_schema = schema.as_arrow();
1422        insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1423        Ok(())
1424    }
1425
1426    #[test]
1427    fn join_qualified() -> Result<()> {
1428        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1429        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1430        let join = left.join(&right)?;
1431        assert_eq!(
1432            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1433            join.to_string()
1434        );
1435        // test valid access
1436        assert!(join
1437            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1438            .is_ok());
1439        assert!(join
1440            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1441            .is_ok());
1442        // test invalid access
1443        assert!(join.field_with_unqualified_name("c0").is_err());
1444        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1445        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1446        Ok(())
1447    }
1448
1449    #[test]
1450    fn join_qualified_duplicate() -> Result<()> {
1451        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1452        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1453        let join = left.join(&right);
1454        assert_eq!(
1455            join.unwrap_err().strip_backtrace(),
1456            "Schema error: Schema contains duplicate qualified field name t1.c0",
1457        );
1458        Ok(())
1459    }
1460
1461    #[test]
1462    fn join_unqualified_duplicate() -> Result<()> {
1463        let left = DFSchema::try_from(test_schema_1())?;
1464        let right = DFSchema::try_from(test_schema_1())?;
1465        let join = left.join(&right);
1466        assert_eq!(
1467            join.unwrap_err().strip_backtrace(),
1468            "Schema error: Schema contains duplicate unqualified field name c0"
1469        );
1470        Ok(())
1471    }
1472
1473    #[test]
1474    fn join_mixed() -> Result<()> {
1475        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1476        let right = DFSchema::try_from(test_schema_2())?;
1477        let join = left.join(&right)?;
1478        assert_eq!(
1479            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1480            join.to_string()
1481        );
1482        // test valid access
1483        assert!(join
1484            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1485            .is_ok());
1486        assert!(join.field_with_unqualified_name("c0").is_ok());
1487        assert!(join.field_with_unqualified_name("c100").is_ok());
1488        assert!(join.field_with_name(None, "c100").is_ok());
1489        // test invalid access
1490        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1491        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1492        assert!(join
1493            .field_with_qualified_name(&TableReference::bare(""), "c100")
1494            .is_err());
1495        Ok(())
1496    }
1497
1498    #[test]
1499    fn join_mixed_duplicate() -> Result<()> {
1500        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1501        let right = DFSchema::try_from(test_schema_1())?;
1502        let join = left.join(&right);
1503        assert_contains!(join.unwrap_err().to_string(),
1504                         "Schema error: Schema contains qualified \
1505                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1506        Ok(())
1507    }
1508
1509    #[test]
1510    fn helpful_error_messages() -> Result<()> {
1511        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1512        let expected_help = "Valid fields are t1.c0, t1.c1.";
1513        assert_contains!(
1514            schema
1515                .field_with_qualified_name(&TableReference::bare("x"), "y")
1516                .unwrap_err()
1517                .to_string(),
1518            expected_help
1519        );
1520        assert_contains!(
1521            schema
1522                .field_with_unqualified_name("y")
1523                .unwrap_err()
1524                .to_string(),
1525            expected_help
1526        );
1527        assert!(schema.index_of_column_by_name(None, "y").is_none());
1528        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1529
1530        Ok(())
1531    }
1532
1533    #[test]
1534    fn select_without_valid_fields() {
1535        let schema = DFSchema::empty();
1536
1537        let col = Column::from_qualified_name("t1.c0");
1538        let err = schema.index_of_column(&col).unwrap_err();
1539        let expected = "Schema error: No field named t1.c0.";
1540        assert_eq!(err.strip_backtrace(), expected);
1541
1542        // the same check without qualifier
1543        let col = Column::from_name("c0");
1544        let err = schema.index_of_column(&col).err().unwrap();
1545        let expected = "Schema error: No field named c0.";
1546        assert_eq!(err.strip_backtrace(), expected);
1547    }
1548
1549    #[test]
1550    fn into() {
1551        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1552        let arrow_schema = Schema::new_with_metadata(
1553            vec![Field::new("c0", DataType::Int64, true)],
1554            test_metadata(),
1555        );
1556        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1557
1558        let df_schema = DFSchema {
1559            inner: Arc::clone(&arrow_schema_ref),
1560            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1561            functional_dependencies: FunctionalDependencies::empty(),
1562        };
1563        let df_schema_ref = Arc::new(df_schema.clone());
1564
1565        {
1566            let arrow_schema = arrow_schema.clone();
1567            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1568
1569            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1570            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1571        }
1572
1573        {
1574            let arrow_schema = arrow_schema.clone();
1575            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1576
1577            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1578            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1579        }
1580
1581        // Now, consume the refs
1582        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1583        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1584    }
1585
1586    fn test_schema_1() -> Schema {
1587        Schema::new(vec![
1588            Field::new("c0", DataType::Boolean, true),
1589            Field::new("c1", DataType::Boolean, true),
1590        ])
1591    }
1592    #[test]
1593    fn test_dfschema_to_schema_conversion() {
1594        let mut a_metadata = HashMap::new();
1595        a_metadata.insert("key".to_string(), "value".to_string());
1596        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1597
1598        let mut b_metadata = HashMap::new();
1599        b_metadata.insert("key".to_string(), "value".to_string());
1600        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1601
1602        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1603
1604        let df_schema = DFSchema {
1605            inner: Arc::clone(&schema),
1606            field_qualifiers: vec![None; schema.fields.len()],
1607            functional_dependencies: FunctionalDependencies::empty(),
1608        };
1609
1610        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1611    }
1612
1613    #[test]
1614    fn test_contain_column() -> Result<()> {
1615        // qualified exists
1616        {
1617            let col = Column::from_qualified_name("t1.c0");
1618            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1619            assert!(schema.is_column_from_schema(&col));
1620        }
1621
1622        // qualified not exists
1623        {
1624            let col = Column::from_qualified_name("t1.c2");
1625            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1626            assert!(!schema.is_column_from_schema(&col));
1627        }
1628
1629        // unqualified exists
1630        {
1631            let col = Column::from_name("c0");
1632            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1633            assert!(schema.is_column_from_schema(&col));
1634        }
1635
1636        // unqualified not exists
1637        {
1638            let col = Column::from_name("c2");
1639            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1640            assert!(!schema.is_column_from_schema(&col));
1641        }
1642
1643        Ok(())
1644    }
1645
1646    #[test]
1647    fn test_datatype_is_logically_equal() {
1648        assert!(DFSchema::datatype_is_logically_equal(
1649            &DataType::Int8,
1650            &DataType::Int8
1651        ));
1652
1653        assert!(!DFSchema::datatype_is_logically_equal(
1654            &DataType::Int8,
1655            &DataType::Int16
1656        ));
1657
1658        // Test lists
1659
1660        // Succeeds if both have the same element type, disregards names and nullability
1661        assert!(DFSchema::datatype_is_logically_equal(
1662            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1663            &DataType::List(Field::new("element", DataType::Int8, false).into())
1664        ));
1665
1666        // Fails if element type is different
1667        assert!(!DFSchema::datatype_is_logically_equal(
1668            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1669            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1670        ));
1671
1672        // Test maps
1673        let map_field = DataType::Map(
1674            Field::new(
1675                "entries",
1676                DataType::Struct(Fields::from(vec![
1677                    Field::new("key", DataType::Int8, false),
1678                    Field::new("value", DataType::Int8, true),
1679                ])),
1680                true,
1681            )
1682            .into(),
1683            true,
1684        );
1685
1686        // Succeeds if both maps have the same key and value types, disregards names and nullability
1687        assert!(DFSchema::datatype_is_logically_equal(
1688            &map_field,
1689            &DataType::Map(
1690                Field::new(
1691                    "pairs",
1692                    DataType::Struct(Fields::from(vec![
1693                        Field::new("one", DataType::Int8, false),
1694                        Field::new("two", DataType::Int8, false)
1695                    ])),
1696                    true
1697                )
1698                .into(),
1699                true
1700            )
1701        ));
1702        // Fails if value type is different
1703        assert!(!DFSchema::datatype_is_logically_equal(
1704            &map_field,
1705            &DataType::Map(
1706                Field::new(
1707                    "entries",
1708                    DataType::Struct(Fields::from(vec![
1709                        Field::new("key", DataType::Int8, false),
1710                        Field::new("value", DataType::Int16, true)
1711                    ])),
1712                    true
1713                )
1714                .into(),
1715                true
1716            )
1717        ));
1718
1719        // Fails if key type is different
1720        assert!(!DFSchema::datatype_is_logically_equal(
1721            &map_field,
1722            &DataType::Map(
1723                Field::new(
1724                    "entries",
1725                    DataType::Struct(Fields::from(vec![
1726                        Field::new("key", DataType::Int16, false),
1727                        Field::new("value", DataType::Int8, true)
1728                    ])),
1729                    true
1730                )
1731                .into(),
1732                true
1733            )
1734        ));
1735
1736        // Test structs
1737
1738        let struct_field = DataType::Struct(Fields::from(vec![
1739            Field::new("a", DataType::Int8, true),
1740            Field::new("b", DataType::Int8, true),
1741        ]));
1742
1743        // Succeeds if both have same names and datatypes, ignores nullability
1744        assert!(DFSchema::datatype_is_logically_equal(
1745            &struct_field,
1746            &DataType::Struct(Fields::from(vec![
1747                Field::new("a", DataType::Int8, false),
1748                Field::new("b", DataType::Int8, true),
1749            ]))
1750        ));
1751
1752        // Fails if field names are different
1753        assert!(!DFSchema::datatype_is_logically_equal(
1754            &struct_field,
1755            &DataType::Struct(Fields::from(vec![
1756                Field::new("x", DataType::Int8, true),
1757                Field::new("y", DataType::Int8, true),
1758            ]))
1759        ));
1760
1761        // Fails if types are different
1762        assert!(!DFSchema::datatype_is_logically_equal(
1763            &struct_field,
1764            &DataType::Struct(Fields::from(vec![
1765                Field::new("a", DataType::Int16, true),
1766                Field::new("b", DataType::Int8, true),
1767            ]))
1768        ));
1769
1770        // Fails if more or less fields
1771        assert!(!DFSchema::datatype_is_logically_equal(
1772            &struct_field,
1773            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1774        ));
1775    }
1776
1777    #[test]
1778    fn test_datatype_is_logically_equivalent_to_dictionary() {
1779        // Dictionary is logically equal to its value type
1780        assert!(DFSchema::datatype_is_logically_equal(
1781            &DataType::Utf8,
1782            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1783        ));
1784    }
1785
1786    #[test]
1787    fn test_datatype_is_semantically_equal() {
1788        assert!(DFSchema::datatype_is_semantically_equal(
1789            &DataType::Int8,
1790            &DataType::Int8
1791        ));
1792
1793        assert!(!DFSchema::datatype_is_semantically_equal(
1794            &DataType::Int8,
1795            &DataType::Int16
1796        ));
1797
1798        // Succeeds if decimal precision and scale are different
1799        assert!(DFSchema::datatype_is_semantically_equal(
1800            &DataType::Decimal32(1, 2),
1801            &DataType::Decimal32(2, 1),
1802        ));
1803
1804        assert!(DFSchema::datatype_is_semantically_equal(
1805            &DataType::Decimal64(1, 2),
1806            &DataType::Decimal64(2, 1),
1807        ));
1808
1809        assert!(DFSchema::datatype_is_semantically_equal(
1810            &DataType::Decimal128(1, 2),
1811            &DataType::Decimal128(2, 1),
1812        ));
1813
1814        assert!(DFSchema::datatype_is_semantically_equal(
1815            &DataType::Decimal256(1, 2),
1816            &DataType::Decimal256(2, 1),
1817        ));
1818
1819        // Any two timestamp types should match
1820        assert!(DFSchema::datatype_is_semantically_equal(
1821            &DataType::Timestamp(
1822                arrow::datatypes::TimeUnit::Microsecond,
1823                Some("UTC".into())
1824            ),
1825            &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1826        ));
1827
1828        // Test lists
1829
1830        // Succeeds if both have the same element type, disregards names and nullability
1831        assert!(DFSchema::datatype_is_semantically_equal(
1832            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1833            &DataType::List(Field::new("element", DataType::Int8, false).into())
1834        ));
1835
1836        // Fails if element type is different
1837        assert!(!DFSchema::datatype_is_semantically_equal(
1838            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1839            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1840        ));
1841
1842        // Test maps
1843        let map_field = DataType::Map(
1844            Field::new(
1845                "entries",
1846                DataType::Struct(Fields::from(vec![
1847                    Field::new("key", DataType::Int8, false),
1848                    Field::new("value", DataType::Int8, true),
1849                ])),
1850                true,
1851            )
1852            .into(),
1853            true,
1854        );
1855
1856        // Succeeds if both maps have the same key and value types, disregards names and nullability
1857        assert!(DFSchema::datatype_is_semantically_equal(
1858            &map_field,
1859            &DataType::Map(
1860                Field::new(
1861                    "pairs",
1862                    DataType::Struct(Fields::from(vec![
1863                        Field::new("one", DataType::Int8, false),
1864                        Field::new("two", DataType::Int8, false)
1865                    ])),
1866                    true
1867                )
1868                .into(),
1869                true
1870            )
1871        ));
1872        // Fails if value type is different
1873        assert!(!DFSchema::datatype_is_semantically_equal(
1874            &map_field,
1875            &DataType::Map(
1876                Field::new(
1877                    "entries",
1878                    DataType::Struct(Fields::from(vec![
1879                        Field::new("key", DataType::Int8, false),
1880                        Field::new("value", DataType::Int16, true)
1881                    ])),
1882                    true
1883                )
1884                .into(),
1885                true
1886            )
1887        ));
1888
1889        // Fails if key type is different
1890        assert!(!DFSchema::datatype_is_semantically_equal(
1891            &map_field,
1892            &DataType::Map(
1893                Field::new(
1894                    "entries",
1895                    DataType::Struct(Fields::from(vec![
1896                        Field::new("key", DataType::Int16, false),
1897                        Field::new("value", DataType::Int8, true)
1898                    ])),
1899                    true
1900                )
1901                .into(),
1902                true
1903            )
1904        ));
1905
1906        // Test structs
1907
1908        let struct_field = DataType::Struct(Fields::from(vec![
1909            Field::new("a", DataType::Int8, true),
1910            Field::new("b", DataType::Int8, true),
1911        ]));
1912
1913        // Succeeds if both have same names and datatypes, ignores nullability
1914        assert!(DFSchema::datatype_is_logically_equal(
1915            &struct_field,
1916            &DataType::Struct(Fields::from(vec![
1917                Field::new("a", DataType::Int8, false),
1918                Field::new("b", DataType::Int8, true),
1919            ]))
1920        ));
1921
1922        // Fails if field names are different
1923        assert!(!DFSchema::datatype_is_logically_equal(
1924            &struct_field,
1925            &DataType::Struct(Fields::from(vec![
1926                Field::new("x", DataType::Int8, true),
1927                Field::new("y", DataType::Int8, true),
1928            ]))
1929        ));
1930
1931        // Fails if types are different
1932        assert!(!DFSchema::datatype_is_logically_equal(
1933            &struct_field,
1934            &DataType::Struct(Fields::from(vec![
1935                Field::new("a", DataType::Int16, true),
1936                Field::new("b", DataType::Int8, true),
1937            ]))
1938        ));
1939
1940        // Fails if more or less fields
1941        assert!(!DFSchema::datatype_is_logically_equal(
1942            &struct_field,
1943            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1944        ));
1945    }
1946
1947    #[test]
1948    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1949        // Dictionary is not semantically equal to its value type
1950        assert!(!DFSchema::datatype_is_semantically_equal(
1951            &DataType::Utf8,
1952            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1953        ));
1954    }
1955
1956    fn test_schema_2() -> Schema {
1957        Schema::new(vec![
1958            Field::new("c100", DataType::Boolean, true),
1959            Field::new("c101", DataType::Boolean, true),
1960        ])
1961    }
1962
1963    fn test_metadata() -> HashMap<String, String> {
1964        test_metadata_n(2)
1965    }
1966
1967    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1968        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1969    }
1970
1971    #[test]
1972    fn test_print_schema_unqualified() {
1973        let schema = DFSchema::from_unqualified_fields(
1974            vec![
1975                Field::new("id", DataType::Int32, false),
1976                Field::new("name", DataType::Utf8, true),
1977                Field::new("age", DataType::Int64, true),
1978                Field::new("active", DataType::Boolean, false),
1979            ]
1980            .into(),
1981            HashMap::new(),
1982        )
1983        .unwrap();
1984
1985        let output = schema.tree_string();
1986
1987        insta::assert_snapshot!(output, @r"
1988        root
1989         |-- id: int32 (nullable = false)
1990         |-- name: utf8 (nullable = true)
1991         |-- age: int64 (nullable = true)
1992         |-- active: boolean (nullable = false)
1993        ");
1994    }
1995
1996    #[test]
1997    fn test_print_schema_qualified() {
1998        let schema = DFSchema::try_from_qualified_schema(
1999            "table1",
2000            &Schema::new(vec![
2001                Field::new("id", DataType::Int32, false),
2002                Field::new("name", DataType::Utf8, true),
2003            ]),
2004        )
2005        .unwrap();
2006
2007        let output = schema.tree_string();
2008
2009        insta::assert_snapshot!(output, @r"
2010        root
2011         |-- table1.id: int32 (nullable = false)
2012         |-- table1.name: utf8 (nullable = true)
2013        ");
2014    }
2015
2016    #[test]
2017    fn test_print_schema_complex_types() {
2018        let struct_field = Field::new(
2019            "address",
2020            DataType::Struct(Fields::from(vec![
2021                Field::new("street", DataType::Utf8, true),
2022                Field::new("city", DataType::Utf8, true),
2023            ])),
2024            true,
2025        );
2026
2027        let list_field = Field::new(
2028            "tags",
2029            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2030            true,
2031        );
2032
2033        let schema = DFSchema::from_unqualified_fields(
2034            vec![
2035                Field::new("id", DataType::Int32, false),
2036                struct_field,
2037                list_field,
2038                Field::new("score", DataType::Decimal128(10, 2), true),
2039            ]
2040            .into(),
2041            HashMap::new(),
2042        )
2043        .unwrap();
2044
2045        let output = schema.tree_string();
2046        insta::assert_snapshot!(output, @r"
2047        root
2048         |-- id: int32 (nullable = false)
2049         |-- address: struct (nullable = true)
2050         |    |-- street: utf8 (nullable = true)
2051         |    |-- city: utf8 (nullable = true)
2052         |-- tags: list (nullable = true)
2053         |    |-- item: utf8 (nullable = true)
2054         |-- score: decimal128(10, 2) (nullable = true)
2055        ");
2056    }
2057
2058    #[test]
2059    fn test_print_schema_empty() {
2060        let schema = DFSchema::empty();
2061        let output = schema.tree_string();
2062        insta::assert_snapshot!(output, @r###"root"###);
2063    }
2064
2065    #[test]
2066    fn test_print_schema_deeply_nested_types() {
2067        // Create a deeply nested structure to test indentation and complex type formatting
2068        let inner_struct = Field::new(
2069            "inner",
2070            DataType::Struct(Fields::from(vec![
2071                Field::new("level1", DataType::Utf8, true),
2072                Field::new("level2", DataType::Int32, false),
2073            ])),
2074            true,
2075        );
2076
2077        let nested_list = Field::new(
2078            "nested_list",
2079            DataType::List(Arc::new(Field::new(
2080                "item",
2081                DataType::Struct(Fields::from(vec![
2082                    Field::new("id", DataType::Int64, false),
2083                    Field::new("value", DataType::Float64, true),
2084                ])),
2085                true,
2086            ))),
2087            true,
2088        );
2089
2090        let map_field = Field::new(
2091            "map_data",
2092            DataType::Map(
2093                Arc::new(Field::new(
2094                    "entries",
2095                    DataType::Struct(Fields::from(vec![
2096                        Field::new("key", DataType::Utf8, false),
2097                        Field::new(
2098                            "value",
2099                            DataType::List(Arc::new(Field::new(
2100                                "item",
2101                                DataType::Int32,
2102                                true,
2103                            ))),
2104                            true,
2105                        ),
2106                    ])),
2107                    false,
2108                )),
2109                false,
2110            ),
2111            true,
2112        );
2113
2114        let schema = DFSchema::from_unqualified_fields(
2115            vec![
2116                Field::new("simple_field", DataType::Utf8, true),
2117                inner_struct,
2118                nested_list,
2119                map_field,
2120                Field::new(
2121                    "timestamp_field",
2122                    DataType::Timestamp(
2123                        arrow::datatypes::TimeUnit::Microsecond,
2124                        Some("UTC".into()),
2125                    ),
2126                    false,
2127                ),
2128            ]
2129            .into(),
2130            HashMap::new(),
2131        )
2132        .unwrap();
2133
2134        let output = schema.tree_string();
2135
2136        insta::assert_snapshot!(output, @r"
2137        root
2138         |-- simple_field: utf8 (nullable = true)
2139         |-- inner: struct (nullable = true)
2140         |    |-- level1: utf8 (nullable = true)
2141         |    |-- level2: int32 (nullable = false)
2142         |-- nested_list: list (nullable = true)
2143         |    |-- item: struct (nullable = true)
2144         |    |    |-- id: int64 (nullable = false)
2145         |    |    |-- value: float64 (nullable = true)
2146         |-- map_data: map (nullable = true)
2147         |    |-- key: utf8 (nullable = false)
2148         |    |-- value: list (nullable = true)
2149         |    |    |-- item: int32 (nullable = true)
2150         |-- timestamp_field: timestamp (UTC) (nullable = false)
2151        ");
2152    }
2153
2154    #[test]
2155    fn test_print_schema_mixed_qualified_unqualified() {
2156        // Test a schema with mixed qualified and unqualified fields
2157        let schema = DFSchema::new_with_metadata(
2158            vec![
2159                (
2160                    Some("table1".into()),
2161                    Arc::new(Field::new("id", DataType::Int32, false)),
2162                ),
2163                (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2164                (
2165                    Some("table2".into()),
2166                    Arc::new(Field::new("score", DataType::Float64, true)),
2167                ),
2168                (
2169                    None,
2170                    Arc::new(Field::new("active", DataType::Boolean, false)),
2171                ),
2172            ],
2173            HashMap::new(),
2174        )
2175        .unwrap();
2176
2177        let output = schema.tree_string();
2178
2179        insta::assert_snapshot!(output, @r"
2180        root
2181         |-- table1.id: int32 (nullable = false)
2182         |-- name: utf8 (nullable = true)
2183         |-- table2.score: float64 (nullable = true)
2184         |-- active: boolean (nullable = false)
2185        ");
2186    }
2187
2188    #[test]
2189    fn test_print_schema_array_of_map() {
2190        // Test the specific example from user feedback: array of map
2191        let map_field = Field::new(
2192            "entries",
2193            DataType::Struct(Fields::from(vec![
2194                Field::new("key", DataType::Utf8, false),
2195                Field::new("value", DataType::Utf8, false),
2196            ])),
2197            false,
2198        );
2199
2200        let array_of_map_field = Field::new(
2201            "array_map_field",
2202            DataType::List(Arc::new(Field::new(
2203                "item",
2204                DataType::Map(Arc::new(map_field), false),
2205                false,
2206            ))),
2207            false,
2208        );
2209
2210        let schema = DFSchema::from_unqualified_fields(
2211            vec![array_of_map_field].into(),
2212            HashMap::new(),
2213        )
2214        .unwrap();
2215
2216        let output = schema.tree_string();
2217
2218        insta::assert_snapshot!(output, @r"
2219        root
2220         |-- array_map_field: list (nullable = false)
2221         |    |-- item: map (nullable = false)
2222         |    |    |-- key: utf8 (nullable = false)
2223         |    |    |-- value: utf8 (nullable = false)
2224        ");
2225    }
2226
2227    #[test]
2228    fn test_print_schema_complex_type_combinations() {
2229        // Test various combinations of list, struct, and map types
2230
2231        // List of structs
2232        let list_of_structs = Field::new(
2233            "list_of_structs",
2234            DataType::List(Arc::new(Field::new(
2235                "item",
2236                DataType::Struct(Fields::from(vec![
2237                    Field::new("id", DataType::Int32, false),
2238                    Field::new("name", DataType::Utf8, true),
2239                    Field::new("score", DataType::Float64, true),
2240                ])),
2241                true,
2242            ))),
2243            true,
2244        );
2245
2246        // Struct containing lists
2247        let struct_with_lists = Field::new(
2248            "struct_with_lists",
2249            DataType::Struct(Fields::from(vec![
2250                Field::new(
2251                    "tags",
2252                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2253                    true,
2254                ),
2255                Field::new(
2256                    "scores",
2257                    DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2258                    false,
2259                ),
2260                Field::new("metadata", DataType::Utf8, true),
2261            ])),
2262            false,
2263        );
2264
2265        // Map with struct values
2266        let map_with_struct_values = Field::new(
2267            "map_with_struct_values",
2268            DataType::Map(
2269                Arc::new(Field::new(
2270                    "entries",
2271                    DataType::Struct(Fields::from(vec![
2272                        Field::new("key", DataType::Utf8, false),
2273                        Field::new(
2274                            "value",
2275                            DataType::Struct(Fields::from(vec![
2276                                Field::new("count", DataType::Int64, false),
2277                                Field::new("active", DataType::Boolean, true),
2278                            ])),
2279                            true,
2280                        ),
2281                    ])),
2282                    false,
2283                )),
2284                false,
2285            ),
2286            true,
2287        );
2288
2289        // List of maps
2290        let list_of_maps = Field::new(
2291            "list_of_maps",
2292            DataType::List(Arc::new(Field::new(
2293                "item",
2294                DataType::Map(
2295                    Arc::new(Field::new(
2296                        "entries",
2297                        DataType::Struct(Fields::from(vec![
2298                            Field::new("key", DataType::Utf8, false),
2299                            Field::new("value", DataType::Int32, true),
2300                        ])),
2301                        false,
2302                    )),
2303                    false,
2304                ),
2305                true,
2306            ))),
2307            true,
2308        );
2309
2310        // Deeply nested: struct containing list of structs containing maps
2311        let deeply_nested = Field::new(
2312            "deeply_nested",
2313            DataType::Struct(Fields::from(vec![
2314                Field::new("level1", DataType::Utf8, true),
2315                Field::new(
2316                    "level2",
2317                    DataType::List(Arc::new(Field::new(
2318                        "item",
2319                        DataType::Struct(Fields::from(vec![
2320                            Field::new("id", DataType::Int32, false),
2321                            Field::new(
2322                                "properties",
2323                                DataType::Map(
2324                                    Arc::new(Field::new(
2325                                        "entries",
2326                                        DataType::Struct(Fields::from(vec![
2327                                            Field::new("key", DataType::Utf8, false),
2328                                            Field::new("value", DataType::Float64, true),
2329                                        ])),
2330                                        false,
2331                                    )),
2332                                    false,
2333                                ),
2334                                true,
2335                            ),
2336                        ])),
2337                        true,
2338                    ))),
2339                    false,
2340                ),
2341            ])),
2342            true,
2343        );
2344
2345        let schema = DFSchema::from_unqualified_fields(
2346            vec![
2347                list_of_structs,
2348                struct_with_lists,
2349                map_with_struct_values,
2350                list_of_maps,
2351                deeply_nested,
2352            ]
2353            .into(),
2354            HashMap::new(),
2355        )
2356        .unwrap();
2357
2358        let output = schema.tree_string();
2359
2360        insta::assert_snapshot!(output, @r"
2361        root
2362         |-- list_of_structs: list (nullable = true)
2363         |    |-- item: struct (nullable = true)
2364         |    |    |-- id: int32 (nullable = false)
2365         |    |    |-- name: utf8 (nullable = true)
2366         |    |    |-- score: float64 (nullable = true)
2367         |-- struct_with_lists: struct (nullable = false)
2368         |    |-- tags: list (nullable = true)
2369         |    |    |-- item: utf8 (nullable = true)
2370         |    |-- scores: list (nullable = false)
2371         |    |    |-- item: int32 (nullable = true)
2372         |    |-- metadata: utf8 (nullable = true)
2373         |-- map_with_struct_values: map (nullable = true)
2374         |    |-- key: utf8 (nullable = false)
2375         |    |-- value: struct (nullable = true)
2376         |    |    |-- count: int64 (nullable = false)
2377         |    |    |-- active: boolean (nullable = true)
2378         |-- list_of_maps: list (nullable = true)
2379         |    |-- item: map (nullable = true)
2380         |    |    |-- key: utf8 (nullable = false)
2381         |    |    |-- value: int32 (nullable = false)
2382         |-- deeply_nested: struct (nullable = true)
2383         |    |-- level1: utf8 (nullable = true)
2384         |    |-- level2: list (nullable = false)
2385         |    |    |-- item: struct (nullable = true)
2386         |    |    |    |-- id: int32 (nullable = false)
2387         |    |    |    |-- properties: map (nullable = true)
2388         |    |    |    |    |-- key: utf8 (nullable = false)
2389         |    |    |    |    |-- value: float64 (nullable = false)
2390        ");
2391    }
2392
2393    #[test]
2394    fn test_print_schema_edge_case_types() {
2395        // Test edge cases and special types
2396        let schema = DFSchema::from_unqualified_fields(
2397            vec![
2398                Field::new("null_field", DataType::Null, true),
2399                Field::new("binary_field", DataType::Binary, false),
2400                Field::new("large_binary", DataType::LargeBinary, true),
2401                Field::new("large_utf8", DataType::LargeUtf8, false),
2402                Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2403                Field::new(
2404                    "fixed_size_list",
2405                    DataType::FixedSizeList(
2406                        Arc::new(Field::new("item", DataType::Int32, true)),
2407                        5,
2408                    ),
2409                    false,
2410                ),
2411                Field::new("decimal32", DataType::Decimal32(9, 4), true),
2412                Field::new("decimal64", DataType::Decimal64(9, 4), true),
2413                Field::new("decimal128", DataType::Decimal128(18, 4), true),
2414                Field::new("decimal256", DataType::Decimal256(38, 10), false),
2415                Field::new("date32", DataType::Date32, true),
2416                Field::new("date64", DataType::Date64, false),
2417                Field::new(
2418                    "time32_seconds",
2419                    DataType::Time32(arrow::datatypes::TimeUnit::Second),
2420                    true,
2421                ),
2422                Field::new(
2423                    "time64_nanoseconds",
2424                    DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2425                    false,
2426                ),
2427            ]
2428            .into(),
2429            HashMap::new(),
2430        )
2431        .unwrap();
2432
2433        let output = schema.tree_string();
2434
2435        insta::assert_snapshot!(output, @r"
2436        root
2437         |-- null_field: null (nullable = true)
2438         |-- binary_field: binary (nullable = false)
2439         |-- large_binary: large_binary (nullable = true)
2440         |-- large_utf8: large_utf8 (nullable = false)
2441         |-- fixed_size_binary: fixed_size_binary (nullable = true)
2442         |-- fixed_size_list: fixed size list (nullable = false)
2443         |    |-- item: int32 (nullable = true)
2444         |-- decimal32: decimal32(9, 4) (nullable = true)
2445         |-- decimal64: decimal64(9, 4) (nullable = true)
2446         |-- decimal128: decimal128(18, 4) (nullable = true)
2447         |-- decimal256: decimal256(38, 10) (nullable = false)
2448         |-- date32: date32 (nullable = true)
2449         |-- date64: date64 (nullable = false)
2450         |-- time32_seconds: time32 (nullable = true)
2451         |-- time64_nanoseconds: time64 (nullable = false)
2452        ");
2453    }
2454}