datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{_plan_err, _schema_err, DataFusionError, Result};
27use crate::{
28    Column, FunctionalDependencies, SchemaError, TableReference, field_not_found,
29    unqualified_field_not_found,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and add a relation (table) name.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///]
51/// # See Also
52/// * [DFSchemaRef], an alias to `Arc<DFSchema>`
53/// * [DataTypeExt], common methods for working with Arrow [DataType]s
54/// * [FieldExt], extension methods for working with Arrow [Field]s
55///
56/// [DataTypeExt]: crate::datatype::DataTypeExt
57/// [FieldExt]: crate::datatype::FieldExt
58///
59/// # Creating qualified schemas
60///
61/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
62/// an Arrow schema.
63///
64/// ```rust
65/// use arrow::datatypes::{DataType, Field, Schema};
66/// use datafusion_common::{Column, DFSchema};
67///
68/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
69///
70/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
71/// let column = Column::from_qualified_name("t1.c1");
72/// assert!(df_schema.has_column(&column));
73///
74/// // Can also access qualified fields with unqualified name, if it's unambiguous
75/// let column = Column::from_qualified_name("c1");
76/// assert!(df_schema.has_column(&column));
77/// ```
78///
79/// # Creating unqualified schemas
80///
81/// Create an unqualified schema using TryFrom:
82///
83/// ```rust
84/// use arrow::datatypes::{DataType, Field, Schema};
85/// use datafusion_common::{Column, DFSchema};
86///
87/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
88///
89/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
90/// let column = Column::new_unqualified("c1");
91/// assert!(df_schema.has_column(&column));
92/// ```
93///
94/// # Converting back to Arrow schema
95///
96/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
97///
98/// ```rust
99/// use arrow::datatypes::{Field, Schema};
100/// use datafusion_common::DFSchema;
101/// use std::collections::HashMap;
102///
103/// let df_schema = DFSchema::from_unqualified_fields(
104///     vec![Field::new("c1", arrow::datatypes::DataType::Int32, false)].into(),
105///     HashMap::new(),
106/// )
107/// .unwrap();
108/// let schema: &Schema = df_schema.as_arrow();
109/// assert_eq!(schema.fields().len(), 1);
110/// ```
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct DFSchema {
113    /// Inner Arrow schema reference.
114    inner: SchemaRef,
115    /// Optional qualifiers for each column in this schema. In the same order as
116    /// the `self.inner.fields()`
117    field_qualifiers: Vec<Option<TableReference>>,
118    /// Stores functional dependencies in the schema.
119    functional_dependencies: FunctionalDependencies,
120}
121
122impl DFSchema {
123    /// Creates an empty `DFSchema`
124    pub fn empty() -> Self {
125        Self {
126            inner: Arc::new(Schema::new([])),
127            field_qualifiers: vec![],
128            functional_dependencies: FunctionalDependencies::empty(),
129        }
130    }
131
132    /// Return a reference to the inner Arrow [`Schema`]
133    ///
134    /// Note this does not have the qualifier information
135    pub fn as_arrow(&self) -> &Schema {
136        self.inner.as_ref()
137    }
138
139    /// Return a reference to the inner Arrow [`SchemaRef`]
140    ///
141    /// Note this does not have the qualifier information
142    pub fn inner(&self) -> &SchemaRef {
143        &self.inner
144    }
145
146    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
147    pub fn new_with_metadata(
148        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
149        metadata: HashMap<String, String>,
150    ) -> Result<Self> {
151        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
152            qualified_fields.into_iter().unzip();
153
154        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
155
156        let dfschema = Self {
157            inner: schema,
158            field_qualifiers: qualifiers,
159            functional_dependencies: FunctionalDependencies::empty(),
160        };
161        dfschema.check_names()?;
162        Ok(dfschema)
163    }
164
165    /// Create a new `DFSchema` from a list of Arrow [Field]s
166    pub fn from_unqualified_fields(
167        fields: Fields,
168        metadata: HashMap<String, String>,
169    ) -> Result<Self> {
170        let field_count = fields.len();
171        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
172        let dfschema = Self {
173            inner: schema,
174            field_qualifiers: vec![None; field_count],
175            functional_dependencies: FunctionalDependencies::empty(),
176        };
177        dfschema.check_names()?;
178        Ok(dfschema)
179    }
180
181    /// Create a `DFSchema` from an Arrow schema and a given qualifier
182    ///
183    /// To create a schema from an Arrow schema without a qualifier, use
184    /// `DFSchema::try_from`.
185    pub fn try_from_qualified_schema(
186        qualifier: impl Into<TableReference>,
187        schema: &Schema,
188    ) -> Result<Self> {
189        let qualifier = qualifier.into();
190        let schema = DFSchema {
191            inner: schema.clone().into(),
192            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
193            functional_dependencies: FunctionalDependencies::empty(),
194        };
195        schema.check_names()?;
196        Ok(schema)
197    }
198
199    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
200    pub fn from_field_specific_qualified_schema(
201        qualifiers: Vec<Option<TableReference>>,
202        schema: &SchemaRef,
203    ) -> Result<Self> {
204        let dfschema = Self {
205            inner: Arc::clone(schema),
206            field_qualifiers: qualifiers,
207            functional_dependencies: FunctionalDependencies::empty(),
208        };
209        dfschema.check_names()?;
210        Ok(dfschema)
211    }
212
213    /// Return the same schema, where all fields have a given qualifier.
214    pub fn with_field_specific_qualified_schema(
215        &self,
216        qualifiers: Vec<Option<TableReference>>,
217    ) -> Result<Self> {
218        if qualifiers.len() != self.fields().len() {
219            return _plan_err!(
220                "Number of qualifiers must match number of fields. Expected {}, got {}",
221                self.fields().len(),
222                qualifiers.len()
223            );
224        }
225        Ok(DFSchema {
226            inner: Arc::clone(&self.inner),
227            field_qualifiers: qualifiers,
228            functional_dependencies: self.functional_dependencies.clone(),
229        })
230    }
231
232    /// Check if the schema have some fields with the same name
233    pub fn check_names(&self) -> Result<()> {
234        let mut qualified_names = BTreeSet::new();
235        let mut unqualified_names = BTreeSet::new();
236
237        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
238            if let Some(qualifier) = qualifier {
239                if !qualified_names.insert((qualifier, field.name())) {
240                    return _schema_err!(SchemaError::DuplicateQualifiedField {
241                        qualifier: Box::new(qualifier.clone()),
242                        name: field.name().to_string(),
243                    });
244                }
245            } else if !unqualified_names.insert(field.name()) {
246                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
247                    name: field.name().to_string()
248                });
249            }
250        }
251
252        for (qualifier, name) in qualified_names {
253            if unqualified_names.contains(name) {
254                return _schema_err!(SchemaError::AmbiguousReference {
255                    field: Box::new(Column::new(Some(qualifier.clone()), name))
256                });
257            }
258        }
259        Ok(())
260    }
261
262    /// Assigns functional dependencies.
263    pub fn with_functional_dependencies(
264        mut self,
265        functional_dependencies: FunctionalDependencies,
266    ) -> Result<Self> {
267        if functional_dependencies.is_valid(self.inner.fields.len()) {
268            self.functional_dependencies = functional_dependencies;
269            Ok(self)
270        } else {
271            _plan_err!(
272                "Invalid functional dependency: {:?}",
273                functional_dependencies
274            )
275        }
276    }
277
278    /// Create a new schema that contains the fields from this schema followed by the fields
279    /// from the supplied schema. An error will be returned if there are duplicate field names.
280    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
281        let mut schema_builder = SchemaBuilder::new();
282        schema_builder.extend(self.inner.fields().iter().cloned());
283        schema_builder.extend(schema.fields().iter().cloned());
284        let new_schema = schema_builder.finish();
285
286        let mut new_metadata = self.inner.metadata.clone();
287        new_metadata.extend(schema.inner.metadata.clone());
288        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
289
290        let mut new_qualifiers = self.field_qualifiers.clone();
291        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
292
293        let new_self = Self {
294            inner: Arc::new(new_schema_with_metadata),
295            field_qualifiers: new_qualifiers,
296            functional_dependencies: FunctionalDependencies::empty(),
297        };
298        new_self.check_names()?;
299        Ok(new_self)
300    }
301
302    /// Modify this schema by appending the fields from the supplied schema, ignoring any
303    /// duplicate fields.
304    ///
305    /// ## Merge Precedence
306    ///
307    /// **Schema-level metadata**: Metadata from both schemas is merged.
308    /// If both schemas have the same metadata key, the value from the `other_schema` parameter takes precedence.
309    ///
310    /// **Field-level merging**: Only non-duplicate fields are added. This means that the
311    /// `self` fields will always take precedence over the `other_schema` fields.
312    /// Duplicate field detection is based on:
313    /// - For qualified fields: both qualifier and field name must match
314    /// - For unqualified fields: only field name needs to match
315    ///
316    /// Take note how the precedence for fields & metadata merging differs;
317    /// merging prefers fields from `self` but prefers metadata from `other_schema`.
318    pub fn merge(&mut self, other_schema: &DFSchema) {
319        if other_schema.inner.fields.is_empty() {
320            return;
321        }
322
323        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
324            self.iter().collect();
325        let self_unqualified_names: HashSet<&str> = self
326            .inner
327            .fields
328            .iter()
329            .map(|field| field.name().as_str())
330            .collect();
331
332        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
333        let mut qualifiers = Vec::new();
334        for (qualifier, field) in other_schema.iter() {
335            // skip duplicate columns
336            let duplicated_field = match qualifier {
337                Some(q) => self_fields.contains(&(Some(q), field)),
338                // for unqualified columns, check as unqualified name
339                None => self_unqualified_names.contains(field.name().as_str()),
340            };
341            if !duplicated_field {
342                schema_builder.push(Arc::clone(field));
343                qualifiers.push(qualifier.cloned());
344            }
345        }
346        let mut metadata = self.inner.metadata.clone();
347        metadata.extend(other_schema.inner.metadata.clone());
348
349        let finished = schema_builder.finish();
350        let finished_with_metadata = finished.with_metadata(metadata);
351        self.inner = finished_with_metadata.into();
352        self.field_qualifiers.extend(qualifiers);
353    }
354
355    /// Get a list of fields for this schema
356    pub fn fields(&self) -> &Fields {
357        &self.inner.fields
358    }
359
360    /// Returns a reference to [`FieldRef`] for a column at specific index
361    /// within the schema.
362    ///
363    /// See also [Self::qualified_field] to get both qualifier and field
364    pub fn field(&self, i: usize) -> &FieldRef {
365        &self.inner.fields[i]
366    }
367
368    /// Returns the qualifier (if any) and [`FieldRef`] for a column at specific
369    /// index within the schema.
370    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &FieldRef) {
371        (self.field_qualifiers[i].as_ref(), self.field(i))
372    }
373
374    pub fn index_of_column_by_name(
375        &self,
376        qualifier: Option<&TableReference>,
377        name: &str,
378    ) -> Option<usize> {
379        let mut matches = self
380            .iter()
381            .enumerate()
382            .filter(|(_, (q, f))| match (qualifier, q) {
383                // field to lookup is qualified.
384                // current field is qualified and not shared between relations, compare both
385                // qualifier and name.
386                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
387                // field to lookup is qualified but current field is unqualified.
388                (Some(_), None) => false,
389                // field to lookup is unqualified, no need to compare qualifier
390                (None, Some(_)) | (None, None) => f.name() == name,
391            })
392            .map(|(idx, _)| idx);
393        matches.next()
394    }
395
396    /// Find the index of the column with the given qualifier and name,
397    /// returning `None` if not found
398    ///
399    /// See [Self::index_of_column] for a version that returns an error if the
400    /// column is not found
401    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
402        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
403    }
404
405    /// Find the index of the column with the given qualifier and name,
406    /// returning `Err` if not found
407    ///
408    /// See [Self::maybe_index_of_column] for a version that returns `None` if
409    /// the column is not found
410    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
411        self.maybe_index_of_column(col)
412            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
413    }
414
415    /// Check if the column is in the current schema
416    pub fn is_column_from_schema(&self, col: &Column) -> bool {
417        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
418            .is_some()
419    }
420
421    /// Find the [`FieldRef`] with the given name and optional qualifier
422    pub fn field_with_name(
423        &self,
424        qualifier: Option<&TableReference>,
425        name: &str,
426    ) -> Result<&FieldRef> {
427        if let Some(qualifier) = qualifier {
428            self.field_with_qualified_name(qualifier, name)
429        } else {
430            self.field_with_unqualified_name(name)
431        }
432    }
433
434    /// Find the qualified field with the given name
435    pub fn qualified_field_with_name(
436        &self,
437        qualifier: Option<&TableReference>,
438        name: &str,
439    ) -> Result<(Option<&TableReference>, &FieldRef)> {
440        if let Some(qualifier) = qualifier {
441            let idx = self
442                .index_of_column_by_name(Some(qualifier), name)
443                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
444            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
445        } else {
446            self.qualified_field_with_unqualified_name(name)
447        }
448    }
449
450    /// Find all fields having the given qualifier
451    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&FieldRef> {
452        self.iter()
453            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
454            .map(|(_, f)| f)
455            .collect()
456    }
457
458    /// Find all fields indices having the given qualifier
459    pub fn fields_indices_with_qualified(
460        &self,
461        qualifier: &TableReference,
462    ) -> Vec<usize> {
463        self.iter()
464            .enumerate()
465            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
466            .collect()
467    }
468
469    /// Find all fields that match the given name
470    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&FieldRef> {
471        self.fields()
472            .iter()
473            .filter(|field| field.name() == name)
474            .collect()
475    }
476
477    /// Find all fields that match the given name and return them with their qualifier
478    pub fn qualified_fields_with_unqualified_name(
479        &self,
480        name: &str,
481    ) -> Vec<(Option<&TableReference>, &FieldRef)> {
482        self.iter()
483            .filter(|(_, field)| field.name() == name)
484            .collect()
485    }
486
487    /// Find all fields that match the given name and convert to column
488    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
489        self.iter()
490            .filter(|(_, field)| field.name() == name)
491            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
492            .collect()
493    }
494
495    /// Return all `Column`s for the schema
496    pub fn columns(&self) -> Vec<Column> {
497        self.iter()
498            .map(|(qualifier, field)| {
499                Column::new(qualifier.cloned(), field.name().clone())
500            })
501            .collect()
502    }
503
504    /// Find the qualified field with the given unqualified name
505    pub fn qualified_field_with_unqualified_name(
506        &self,
507        name: &str,
508    ) -> Result<(Option<&TableReference>, &FieldRef)> {
509        let matches = self.qualified_fields_with_unqualified_name(name);
510        match matches.len() {
511            0 => Err(unqualified_field_not_found(name, self)),
512            1 => Ok((matches[0].0, matches[0].1)),
513            _ => {
514                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
515                // Because name may generate from Alias/... . It means that it don't own qualifier.
516                // For example:
517                //             Join on id = b.id
518                // Project a.id as id   TableScan b id
519                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
520                // one field without qualifier, we should return it.
521                let fields_without_qualifier = matches
522                    .iter()
523                    .filter(|(q, _)| q.is_none())
524                    .collect::<Vec<_>>();
525                if fields_without_qualifier.len() == 1 {
526                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
527                } else {
528                    _schema_err!(SchemaError::AmbiguousReference {
529                        field: Box::new(Column::new_unqualified(name.to_string()))
530                    })
531                }
532            }
533        }
534    }
535
536    /// Find the field with the given name
537    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&FieldRef> {
538        self.qualified_field_with_unqualified_name(name)
539            .map(|(_, field)| field)
540    }
541
542    /// Find the field with the given qualified name
543    pub fn field_with_qualified_name(
544        &self,
545        qualifier: &TableReference,
546        name: &str,
547    ) -> Result<&FieldRef> {
548        let idx = self
549            .index_of_column_by_name(Some(qualifier), name)
550            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
551
552        Ok(self.field(idx))
553    }
554
555    /// Find the field with the given qualified column
556    pub fn qualified_field_from_column(
557        &self,
558        column: &Column,
559    ) -> Result<(Option<&TableReference>, &FieldRef)> {
560        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
561    }
562
563    /// Find if the field exists with the given name
564    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
565        self.fields().iter().any(|field| field.name() == name)
566    }
567
568    /// Find if the field exists with the given qualified name
569    pub fn has_column_with_qualified_name(
570        &self,
571        qualifier: &TableReference,
572        name: &str,
573    ) -> bool {
574        self.iter()
575            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
576    }
577
578    /// Find if the field exists with the given qualified column
579    pub fn has_column(&self, column: &Column) -> bool {
580        match &column.relation {
581            Some(r) => self.has_column_with_qualified_name(r, &column.name),
582            None => self.has_column_with_unqualified_name(&column.name),
583        }
584    }
585
586    /// Check to see if unqualified field names matches field names in Arrow schema
587    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
588        self.inner
589            .fields
590            .iter()
591            .zip(arrow_schema.fields().iter())
592            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
593    }
594
595    /// Check to see if fields in 2 Arrow schemas are compatible
596    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
597    pub fn check_arrow_schema_type_compatible(
598        &self,
599        arrow_schema: &Schema,
600    ) -> Result<()> {
601        let self_arrow_schema = self.as_arrow();
602        self_arrow_schema
603            .fields()
604            .iter()
605            .zip(arrow_schema.fields().iter())
606            .try_for_each(|(l_field, r_field)| {
607                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
608                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
609                                r_field.name(),
610                                r_field.data_type(),
611                                l_field.name(),
612                                l_field.data_type())
613                } else {
614                    Ok(())
615                }
616            })
617    }
618
619    /// Returns true if the two schemas have the same qualified named
620    /// fields with logically equivalent data types. Returns false otherwise.
621    ///
622    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
623    /// equivalence checking.
624    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
625        if self.fields().len() != other.fields().len() {
626            return false;
627        }
628        let self_fields = self.iter();
629        let other_fields = other.iter();
630        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
631            q1 == q2
632                && f1.name() == f2.name()
633                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
634        })
635    }
636
637    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
638    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
639        self.has_equivalent_names_and_types(other).is_ok()
640    }
641
642    /// Returns Ok if the two schemas have the same qualified named
643    /// fields with the compatible data types.
644    ///
645    /// Returns an `Err` with a message otherwise.
646    ///
647    /// This is a specialized version of Eq that ignores differences in
648    /// nullability and metadata.
649    ///
650    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
651    /// logical type checking, which for example would consider a dictionary
652    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
653    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
654        // case 1 : schema length mismatch
655        if self.fields().len() != other.fields().len() {
656            _plan_err!(
657                "Schema mismatch: the schema length are not same \
658            Expected schema length: {}, got: {}",
659                self.fields().len(),
660                other.fields().len()
661            )
662        } else {
663            // case 2 : schema length match, but fields mismatch
664            // check if the fields name are the same and have the same data types
665            self.fields()
666                .iter()
667                .zip(other.fields().iter())
668                .try_for_each(|(f1, f2)| {
669                    if f1.name() != f2.name()
670                        || (!DFSchema::datatype_is_semantically_equal(
671                            f1.data_type(),
672                            f2.data_type(),
673                        ))
674                    {
675                        _plan_err!(
676                            "Schema mismatch: Expected field '{}' with type {}, \
677                            but got '{}' with type {}.",
678                            f1.name(),
679                            f1.data_type(),
680                            f2.name(),
681                            f2.data_type()
682                        )
683                    } else {
684                        Ok(())
685                    }
686                })
687        }
688    }
689
690    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
691    /// than datatype_is_semantically_equal in that different representations of same data can be
692    /// logically but not semantically equivalent. Semantically equivalent types are always also
693    /// logically equivalent. For example:
694    /// - a Dictionary<K,V> type is logically equal to a plain V type
695    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
696    /// - Utf8 and Utf8View are logically equal
697    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
698        // check nested fields
699        match (dt1, dt2) {
700            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
701                v1.as_ref() == v2.as_ref()
702            }
703            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
704            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
705            (DataType::List(f1), DataType::List(f2))
706            | (DataType::LargeList(f1), DataType::LargeList(f2))
707            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
708                // Don't compare the names of the technical inner field
709                // Usually "item" but that's not mandated
710                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
711            }
712            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
713                // Don't compare the names of the technical inner fields
714                // Usually "entries", "key", "value" but that's not mandated
715                match (f1.data_type(), f2.data_type()) {
716                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
717                        f1_inner.len() == f2_inner.len()
718                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
719                                Self::datatype_is_logically_equal(
720                                    f1.data_type(),
721                                    f2.data_type(),
722                                )
723                            })
724                    }
725                    _ => panic!("Map type should have an inner struct field"),
726                }
727            }
728            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
729                let iter1 = fields1.iter();
730                let iter2 = fields2.iter();
731                fields1.len() == fields2.len() &&
732                        // all fields have to be the same
733                    iter1
734                    .zip(iter2)
735                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
736            }
737            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
738                let iter1 = fields1.iter();
739                let iter2 = fields2.iter();
740                fields1.len() == fields2.len() &&
741                    // all fields have to be the same
742                    iter1
743                        .zip(iter2)
744                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
745            }
746            // Utf8 and Utf8View are logically equivalent
747            (DataType::Utf8, DataType::Utf8View) => true,
748            (DataType::Utf8View, DataType::Utf8) => true,
749            _ => Self::datatype_is_semantically_equal(dt1, dt2),
750        }
751    }
752
753    /// Returns true of two [`DataType`]s are semantically equal (same
754    /// name and type), ignoring both metadata and nullability, decimal precision/scale,
755    /// and timezone time units/timezones.
756    ///
757    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
758    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
759        // check nested fields
760        match (dt1, dt2) {
761            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
762                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
763                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
764            }
765            (DataType::List(f1), DataType::List(f2))
766            | (DataType::LargeList(f1), DataType::LargeList(f2))
767            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
768                // Don't compare the names of the technical inner field
769                // Usually "item" but that's not mandated
770                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
771            }
772            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
773                // Don't compare the names of the technical inner fields
774                // Usually "entries", "key", "value" but that's not mandated
775                match (f1.data_type(), f2.data_type()) {
776                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
777                        f1_inner.len() == f2_inner.len()
778                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
779                                Self::datatype_is_semantically_equal(
780                                    f1.data_type(),
781                                    f2.data_type(),
782                                )
783                            })
784                    }
785                    _ => panic!("Map type should have an inner struct field"),
786                }
787            }
788            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
789                let iter1 = fields1.iter();
790                let iter2 = fields2.iter();
791                fields1.len() == fields2.len() &&
792                        // all fields have to be the same
793                    iter1
794                    .zip(iter2)
795                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
796            }
797            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
798                let iter1 = fields1.iter();
799                let iter2 = fields2.iter();
800                fields1.len() == fields2.len() &&
801                    // all fields have to be the same
802                    iter1
803                        .zip(iter2)
804                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
805            }
806            (
807                DataType::Decimal32(_l_precision, _l_scale),
808                DataType::Decimal32(_r_precision, _r_scale),
809            ) => true,
810            (
811                DataType::Decimal64(_l_precision, _l_scale),
812                DataType::Decimal64(_r_precision, _r_scale),
813            ) => true,
814            (
815                DataType::Decimal128(_l_precision, _l_scale),
816                DataType::Decimal128(_r_precision, _r_scale),
817            ) => true,
818            (
819                DataType::Decimal256(_l_precision, _l_scale),
820                DataType::Decimal256(_r_precision, _r_scale),
821            ) => true,
822            (
823                DataType::Timestamp(_l_time_unit, _l_timezone),
824                DataType::Timestamp(_r_time_unit, _r_timezone),
825            ) => true,
826            _ => dt1 == dt2,
827        }
828    }
829
830    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
831        f1.name() == f2.name()
832            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
833    }
834
835    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
836        f1.name() == f2.name()
837            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
838    }
839
840    /// Strip all field qualifier in schema
841    pub fn strip_qualifiers(self) -> Self {
842        DFSchema {
843            field_qualifiers: vec![None; self.inner.fields.len()],
844            inner: self.inner,
845            functional_dependencies: self.functional_dependencies,
846        }
847    }
848
849    /// Replace all field qualifier with new value in schema
850    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
851        let qualifier = qualifier.into();
852        DFSchema {
853            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
854            inner: self.inner,
855            functional_dependencies: self.functional_dependencies,
856        }
857    }
858
859    /// Get list of fully-qualified field names in this schema
860    pub fn field_names(&self) -> Vec<String> {
861        self.iter()
862            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
863            .collect::<Vec<_>>()
864    }
865
866    /// Get metadata of this schema
867    pub fn metadata(&self) -> &HashMap<String, String> {
868        &self.inner.metadata
869    }
870
871    /// Get functional dependencies
872    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
873        &self.functional_dependencies
874    }
875
876    /// Iterate over the qualifiers and fields in the DFSchema
877    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
878        self.field_qualifiers
879            .iter()
880            .zip(self.inner.fields().iter())
881            .map(|(qualifier, field)| (qualifier.as_ref(), field))
882    }
883    /// Returns a tree-like string representation of the schema.
884    ///
885    /// This method formats the schema
886    /// with a tree-like structure showing field names, types, and nullability.
887    ///
888    /// # Example
889    ///
890    /// ```
891    /// use arrow::datatypes::{DataType, Field, Schema};
892    /// use datafusion_common::DFSchema;
893    /// use std::collections::HashMap;
894    ///
895    /// let schema = DFSchema::from_unqualified_fields(
896    ///     vec![
897    ///         Field::new("id", DataType::Int32, false),
898    ///         Field::new("name", DataType::Utf8, true),
899    ///     ]
900    ///     .into(),
901    ///     HashMap::new(),
902    /// )
903    /// .unwrap();
904    ///
905    /// assert_eq!(
906    ///     schema.tree_string().to_string(),
907    ///     r#"root
908    ///  |-- id: int32 (nullable = false)
909    ///  |-- name: utf8 (nullable = true)"#
910    /// );
911    /// ```
912    pub fn tree_string(&self) -> impl Display + '_ {
913        let mut result = String::from("root\n");
914
915        for (qualifier, field) in self.iter() {
916            let field_name = match qualifier {
917                Some(q) => format!("{}.{}", q, field.name()),
918                None => field.name().to_string(),
919            };
920
921            format_field_with_indent(
922                &mut result,
923                &field_name,
924                field.data_type(),
925                field.is_nullable(),
926                " ",
927            );
928        }
929
930        // Remove the trailing newline
931        if result.ends_with('\n') {
932            result.pop();
933        }
934
935        result
936    }
937}
938
939/// Format field with proper nested indentation for complex types
940fn format_field_with_indent(
941    result: &mut String,
942    field_name: &str,
943    data_type: &DataType,
944    nullable: bool,
945    indent: &str,
946) {
947    let nullable_str = nullable.to_string().to_lowercase();
948    let child_indent = format!("{indent}|    ");
949
950    match data_type {
951        DataType::List(field) => {
952            result.push_str(&format!(
953                "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
954            ));
955            format_field_with_indent(
956                result,
957                field.name(),
958                field.data_type(),
959                field.is_nullable(),
960                &child_indent,
961            );
962        }
963        DataType::LargeList(field) => {
964            result.push_str(&format!(
965                "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
966            ));
967            format_field_with_indent(
968                result,
969                field.name(),
970                field.data_type(),
971                field.is_nullable(),
972                &child_indent,
973            );
974        }
975        DataType::FixedSizeList(field, _size) => {
976            result.push_str(&format!(
977                "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
978            ));
979            format_field_with_indent(
980                result,
981                field.name(),
982                field.data_type(),
983                field.is_nullable(),
984                &child_indent,
985            );
986        }
987        DataType::Map(field, _) => {
988            result.push_str(&format!(
989                "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
990            ));
991            if let DataType::Struct(inner_fields) = field.data_type()
992                && inner_fields.len() == 2
993            {
994                format_field_with_indent(
995                    result,
996                    "key",
997                    inner_fields[0].data_type(),
998                    inner_fields[0].is_nullable(),
999                    &child_indent,
1000                );
1001                let value_contains_null = field.is_nullable().to_string().to_lowercase();
1002                // Handle complex value types properly
1003                match inner_fields[1].data_type() {
1004                    DataType::Struct(_)
1005                    | DataType::List(_)
1006                    | DataType::LargeList(_)
1007                    | DataType::FixedSizeList(_, _)
1008                    | DataType::Map(_, _) => {
1009                        format_field_with_indent(
1010                            result,
1011                            "value",
1012                            inner_fields[1].data_type(),
1013                            inner_fields[1].is_nullable(),
1014                            &child_indent,
1015                        );
1016                    }
1017                    _ => {
1018                        result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1019                                format_simple_data_type(inner_fields[1].data_type())));
1020                    }
1021                }
1022            }
1023        }
1024        DataType::Struct(fields) => {
1025            result.push_str(&format!(
1026                "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1027            ));
1028            for struct_field in fields {
1029                format_field_with_indent(
1030                    result,
1031                    struct_field.name(),
1032                    struct_field.data_type(),
1033                    struct_field.is_nullable(),
1034                    &child_indent,
1035                );
1036            }
1037        }
1038        _ => {
1039            let type_str = format_simple_data_type(data_type);
1040            result.push_str(&format!(
1041                "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1042            ));
1043        }
1044    }
1045}
1046
1047/// Format simple DataType in lowercase format (for leaf nodes)
1048fn format_simple_data_type(data_type: &DataType) -> String {
1049    match data_type {
1050        DataType::Boolean => "boolean".to_string(),
1051        DataType::Int8 => "int8".to_string(),
1052        DataType::Int16 => "int16".to_string(),
1053        DataType::Int32 => "int32".to_string(),
1054        DataType::Int64 => "int64".to_string(),
1055        DataType::UInt8 => "uint8".to_string(),
1056        DataType::UInt16 => "uint16".to_string(),
1057        DataType::UInt32 => "uint32".to_string(),
1058        DataType::UInt64 => "uint64".to_string(),
1059        DataType::Float16 => "float16".to_string(),
1060        DataType::Float32 => "float32".to_string(),
1061        DataType::Float64 => "float64".to_string(),
1062        DataType::Utf8 => "utf8".to_string(),
1063        DataType::LargeUtf8 => "large_utf8".to_string(),
1064        DataType::Binary => "binary".to_string(),
1065        DataType::LargeBinary => "large_binary".to_string(),
1066        DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1067        DataType::Date32 => "date32".to_string(),
1068        DataType::Date64 => "date64".to_string(),
1069        DataType::Time32(_) => "time32".to_string(),
1070        DataType::Time64(_) => "time64".to_string(),
1071        DataType::Timestamp(_, tz) => match tz {
1072            Some(tz_str) => format!("timestamp ({tz_str})"),
1073            None => "timestamp".to_string(),
1074        },
1075        DataType::Interval(_) => "interval".to_string(),
1076        DataType::Dictionary(_, value_type) => {
1077            format_simple_data_type(value_type.as_ref())
1078        }
1079        DataType::Decimal32(precision, scale) => {
1080            format!("decimal32({precision}, {scale})")
1081        }
1082        DataType::Decimal64(precision, scale) => {
1083            format!("decimal64({precision}, {scale})")
1084        }
1085        DataType::Decimal128(precision, scale) => {
1086            format!("decimal128({precision}, {scale})")
1087        }
1088        DataType::Decimal256(precision, scale) => {
1089            format!("decimal256({precision}, {scale})")
1090        }
1091        DataType::Null => "null".to_string(),
1092        _ => format!("{data_type}").to_lowercase(),
1093    }
1094}
1095
1096/// Allow DFSchema to be converted into an Arrow `&Schema`
1097impl AsRef<Schema> for DFSchema {
1098    fn as_ref(&self) -> &Schema {
1099        self.as_arrow()
1100    }
1101}
1102
1103/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
1104/// example)
1105impl AsRef<SchemaRef> for DFSchema {
1106    fn as_ref(&self) -> &SchemaRef {
1107        self.inner()
1108    }
1109}
1110
1111/// Create a `DFSchema` from an Arrow schema
1112impl TryFrom<Schema> for DFSchema {
1113    type Error = DataFusionError;
1114    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1115        Self::try_from(Arc::new(schema))
1116    }
1117}
1118
1119impl TryFrom<SchemaRef> for DFSchema {
1120    type Error = DataFusionError;
1121    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1122        let field_count = schema.fields.len();
1123        let dfschema = Self {
1124            inner: schema,
1125            field_qualifiers: vec![None; field_count],
1126            functional_dependencies: FunctionalDependencies::empty(),
1127        };
1128        // Without checking names, because schema here may have duplicate field names.
1129        // For example, Partial AggregateMode will generate duplicate field names from
1130        // state_fields.
1131        // See <https://github.com/apache/datafusion/issues/17715>
1132        // dfschema.check_names()?;
1133        Ok(dfschema)
1134    }
1135}
1136
1137// Hashing refers to a subset of fields considered in PartialEq.
1138impl Hash for DFSchema {
1139    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1140        self.inner.fields.hash(state);
1141        self.inner.metadata.len().hash(state); // HashMap is not hashable
1142    }
1143}
1144
1145/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
1146pub trait ToDFSchema
1147where
1148    Self: Sized,
1149{
1150    /// Attempt to create a DSSchema
1151    fn to_dfschema(self) -> Result<DFSchema>;
1152
1153    /// Attempt to create a DSSchemaRef
1154    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1155        Ok(Arc::new(self.to_dfschema()?))
1156    }
1157}
1158
1159impl ToDFSchema for Schema {
1160    fn to_dfschema(self) -> Result<DFSchema> {
1161        DFSchema::try_from(self)
1162    }
1163}
1164
1165impl ToDFSchema for SchemaRef {
1166    fn to_dfschema(self) -> Result<DFSchema> {
1167        DFSchema::try_from(self)
1168    }
1169}
1170
1171impl ToDFSchema for Vec<Field> {
1172    fn to_dfschema(self) -> Result<DFSchema> {
1173        let field_count = self.len();
1174        let schema = Schema {
1175            fields: self.into(),
1176            metadata: HashMap::new(),
1177        };
1178        let dfschema = DFSchema {
1179            inner: schema.into(),
1180            field_qualifiers: vec![None; field_count],
1181            functional_dependencies: FunctionalDependencies::empty(),
1182        };
1183        Ok(dfschema)
1184    }
1185}
1186
1187impl Display for DFSchema {
1188    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1189        write!(
1190            f,
1191            "fields:[{}], metadata:{:?}",
1192            self.iter()
1193                .map(|(q, f)| qualified_name(q, f.name()))
1194                .collect::<Vec<String>>()
1195                .join(", "),
1196            self.inner.metadata
1197        )
1198    }
1199}
1200
1201/// Provides schema information needed by certain methods of `Expr`
1202/// (defined in the datafusion-common crate).
1203///
1204/// Note that this trait is implemented for &[DFSchema] which is
1205/// widely used in the DataFusion codebase.
1206pub trait ExprSchema: std::fmt::Debug {
1207    /// Is this column reference nullable?
1208    fn nullable(&self, col: &Column) -> Result<bool> {
1209        Ok(self.field_from_column(col)?.is_nullable())
1210    }
1211
1212    /// What is the datatype of this column?
1213    fn data_type(&self, col: &Column) -> Result<&DataType> {
1214        Ok(self.field_from_column(col)?.data_type())
1215    }
1216
1217    /// Returns the column's optional metadata.
1218    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1219        Ok(self.field_from_column(col)?.metadata())
1220    }
1221
1222    /// Return the column's datatype and nullability
1223    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1224        let field = self.field_from_column(col)?;
1225        Ok((field.data_type(), field.is_nullable()))
1226    }
1227
1228    // Return the column's field
1229    fn field_from_column(&self, col: &Column) -> Result<&FieldRef>;
1230}
1231
1232// Implement `ExprSchema` for `Arc<DFSchema>`
1233impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1234    fn nullable(&self, col: &Column) -> Result<bool> {
1235        self.as_ref().nullable(col)
1236    }
1237
1238    fn data_type(&self, col: &Column) -> Result<&DataType> {
1239        self.as_ref().data_type(col)
1240    }
1241
1242    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1243        ExprSchema::metadata(self.as_ref(), col)
1244    }
1245
1246    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1247        self.as_ref().data_type_and_nullable(col)
1248    }
1249
1250    fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1251        self.as_ref().field_from_column(col)
1252    }
1253}
1254
1255impl ExprSchema for DFSchema {
1256    fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1257        match &col.relation {
1258            Some(r) => self.field_with_qualified_name(r, &col.name),
1259            None => self.field_with_unqualified_name(&col.name),
1260        }
1261    }
1262}
1263
1264/// DataFusion-specific extensions to [`Schema`].
1265pub trait SchemaExt {
1266    /// This is a specialized version of Eq that ignores differences
1267    /// in nullability and metadata.
1268    ///
1269    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1270    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1271
1272    /// Returns nothing if the two schemas have the same qualified named
1273    /// fields with logically equivalent data types. Returns internal error otherwise.
1274    ///
1275    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1276    /// equivalence checking.
1277    ///
1278    /// It is only used by insert into cases.
1279    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1280}
1281
1282impl SchemaExt for Schema {
1283    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1284        if self.fields().len() != other.fields().len() {
1285            return false;
1286        }
1287
1288        self.fields()
1289            .iter()
1290            .zip(other.fields().iter())
1291            .all(|(f1, f2)| {
1292                f1.name() == f2.name()
1293                    && DFSchema::datatype_is_semantically_equal(
1294                        f1.data_type(),
1295                        f2.data_type(),
1296                    )
1297            })
1298    }
1299
1300    // It is only used by insert into cases.
1301    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1302        // case 1 : schema length mismatch
1303        if self.fields().len() != other.fields().len() {
1304            _plan_err!(
1305                "Inserting query must have the same schema length as the table. \
1306            Expected table schema length: {}, got: {}",
1307                self.fields().len(),
1308                other.fields().len()
1309            )
1310        } else {
1311            // case 2 : schema length match, but fields mismatch
1312            // check if the fields name are the same and have the same data types
1313            self.fields()
1314                .iter()
1315                .zip(other.fields().iter())
1316                .try_for_each(|(f1, f2)| {
1317                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1318                        _plan_err!(
1319                            "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1320                            but got '{}' with type {}.",
1321                            f1.name(),
1322                            f1.data_type(),
1323                            f2.name(),
1324                            f2.data_type())
1325                    } else {
1326                        Ok(())
1327                    }
1328                })
1329        }
1330    }
1331}
1332
1333pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1334    match qualifier {
1335        Some(q) => format!("{q}.{name}"),
1336        None => name.to_string(),
1337    }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342    use crate::assert_contains;
1343
1344    use super::*;
1345
1346    #[test]
1347    fn qualifier_in_name() -> Result<()> {
1348        let col = Column::from_name("t1.c0");
1349        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1350        // lookup with unqualified name "t1.c0"
1351        let err = schema.index_of_column(&col).unwrap_err();
1352        let expected = "Schema error: No field named \"t1.c0\". \
1353            Column names are case sensitive. \
1354            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1355            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1356            Did you mean 't1.c0'?.";
1357        assert_eq!(err.strip_backtrace(), expected);
1358        Ok(())
1359    }
1360
1361    #[test]
1362    fn quoted_qualifiers_in_name() -> Result<()> {
1363        let col = Column::from_name("t1.c0");
1364        let schema = DFSchema::try_from_qualified_schema(
1365            "t1",
1366            &Schema::new(vec![
1367                Field::new("CapitalColumn", DataType::Boolean, true),
1368                Field::new("field.with.period", DataType::Boolean, true),
1369            ]),
1370        )?;
1371
1372        // lookup with unqualified name "t1.c0"
1373        let err = schema.index_of_column(&col).unwrap_err();
1374        let expected = "Schema error: No field named \"t1.c0\". \
1375            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1376        assert_eq!(err.strip_backtrace(), expected);
1377        Ok(())
1378    }
1379
1380    #[test]
1381    fn from_unqualified_schema() -> Result<()> {
1382        let schema = DFSchema::try_from(test_schema_1())?;
1383        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1384        Ok(())
1385    }
1386
1387    #[test]
1388    fn from_qualified_schema() -> Result<()> {
1389        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1390        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1391        Ok(())
1392    }
1393
1394    #[test]
1395    fn test_from_field_specific_qualified_schema() -> Result<()> {
1396        let schema = DFSchema::from_field_specific_qualified_schema(
1397            vec![Some("t1".into()), None],
1398            &Arc::new(Schema::new(vec![
1399                Field::new("c0", DataType::Boolean, true),
1400                Field::new("c1", DataType::Boolean, true),
1401            ])),
1402        )?;
1403        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1404        Ok(())
1405    }
1406
1407    #[test]
1408    fn test_from_qualified_fields() -> Result<()> {
1409        let schema = DFSchema::new_with_metadata(
1410            vec![
1411                (
1412                    Some("t0".into()),
1413                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1414                ),
1415                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1416            ],
1417            HashMap::new(),
1418        )?;
1419        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1420        Ok(())
1421    }
1422
1423    #[test]
1424    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1425        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1426        let arrow_schema = schema.as_arrow();
1427        insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1428        Ok(())
1429    }
1430
1431    #[test]
1432    fn join_qualified() -> Result<()> {
1433        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1434        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1435        let join = left.join(&right)?;
1436        assert_eq!(
1437            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1438            join.to_string()
1439        );
1440        // test valid access
1441        assert!(
1442            join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1443                .is_ok()
1444        );
1445        assert!(
1446            join.field_with_qualified_name(&TableReference::bare("t2"), "c0")
1447                .is_ok()
1448        );
1449        // test invalid access
1450        assert!(join.field_with_unqualified_name("c0").is_err());
1451        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1452        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1453        Ok(())
1454    }
1455
1456    #[test]
1457    fn join_qualified_duplicate() -> Result<()> {
1458        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1459        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1460        let join = left.join(&right);
1461        assert_eq!(
1462            join.unwrap_err().strip_backtrace(),
1463            "Schema error: Schema contains duplicate qualified field name t1.c0",
1464        );
1465        Ok(())
1466    }
1467
1468    #[test]
1469    fn join_unqualified_duplicate() -> Result<()> {
1470        let left = DFSchema::try_from(test_schema_1())?;
1471        let right = DFSchema::try_from(test_schema_1())?;
1472        let join = left.join(&right);
1473        assert_eq!(
1474            join.unwrap_err().strip_backtrace(),
1475            "Schema error: Schema contains duplicate unqualified field name c0"
1476        );
1477        Ok(())
1478    }
1479
1480    #[test]
1481    fn join_mixed() -> Result<()> {
1482        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1483        let right = DFSchema::try_from(test_schema_2())?;
1484        let join = left.join(&right)?;
1485        assert_eq!(
1486            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1487            join.to_string()
1488        );
1489        // test valid access
1490        assert!(
1491            join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1492                .is_ok()
1493        );
1494        assert!(join.field_with_unqualified_name("c0").is_ok());
1495        assert!(join.field_with_unqualified_name("c100").is_ok());
1496        assert!(join.field_with_name(None, "c100").is_ok());
1497        // test invalid access
1498        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1499        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1500        assert!(
1501            join.field_with_qualified_name(&TableReference::bare(""), "c100")
1502                .is_err()
1503        );
1504        Ok(())
1505    }
1506
1507    #[test]
1508    fn join_mixed_duplicate() -> Result<()> {
1509        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1510        let right = DFSchema::try_from(test_schema_1())?;
1511        let join = left.join(&right);
1512        assert_contains!(
1513            join.unwrap_err().to_string(),
1514            "Schema error: Schema contains qualified \
1515                          field name t1.c0 and unqualified field name c0 which would be ambiguous"
1516        );
1517        Ok(())
1518    }
1519
1520    #[test]
1521    fn helpful_error_messages() -> Result<()> {
1522        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1523        let expected_help = "Valid fields are t1.c0, t1.c1.";
1524        assert_contains!(
1525            schema
1526                .field_with_qualified_name(&TableReference::bare("x"), "y")
1527                .unwrap_err()
1528                .to_string(),
1529            expected_help
1530        );
1531        assert_contains!(
1532            schema
1533                .field_with_unqualified_name("y")
1534                .unwrap_err()
1535                .to_string(),
1536            expected_help
1537        );
1538        assert!(schema.index_of_column_by_name(None, "y").is_none());
1539        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1540
1541        Ok(())
1542    }
1543
1544    #[test]
1545    fn select_without_valid_fields() {
1546        let schema = DFSchema::empty();
1547
1548        let col = Column::from_qualified_name("t1.c0");
1549        let err = schema.index_of_column(&col).unwrap_err();
1550        let expected = "Schema error: No field named t1.c0.";
1551        assert_eq!(err.strip_backtrace(), expected);
1552
1553        // the same check without qualifier
1554        let col = Column::from_name("c0");
1555        let err = schema.index_of_column(&col).err().unwrap();
1556        let expected = "Schema error: No field named c0.";
1557        assert_eq!(err.strip_backtrace(), expected);
1558    }
1559
1560    #[test]
1561    fn into() {
1562        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1563        let arrow_schema = Schema::new_with_metadata(
1564            vec![Field::new("c0", DataType::Int64, true)],
1565            test_metadata(),
1566        );
1567        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1568
1569        let df_schema = DFSchema {
1570            inner: Arc::clone(&arrow_schema_ref),
1571            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1572            functional_dependencies: FunctionalDependencies::empty(),
1573        };
1574        let df_schema_ref = Arc::new(df_schema.clone());
1575
1576        {
1577            let arrow_schema = arrow_schema.clone();
1578            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1579
1580            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1581            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1582        }
1583
1584        {
1585            let arrow_schema = arrow_schema.clone();
1586            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1587
1588            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1589            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1590        }
1591
1592        // Now, consume the refs
1593        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1594        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1595    }
1596
1597    fn test_schema_1() -> Schema {
1598        Schema::new(vec![
1599            Field::new("c0", DataType::Boolean, true),
1600            Field::new("c1", DataType::Boolean, true),
1601        ])
1602    }
1603    #[test]
1604    fn test_dfschema_to_schema_conversion() {
1605        let mut a_metadata = HashMap::new();
1606        a_metadata.insert("key".to_string(), "value".to_string());
1607        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1608
1609        let mut b_metadata = HashMap::new();
1610        b_metadata.insert("key".to_string(), "value".to_string());
1611        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1612
1613        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1614
1615        let df_schema = DFSchema {
1616            inner: Arc::clone(&schema),
1617            field_qualifiers: vec![None; schema.fields.len()],
1618            functional_dependencies: FunctionalDependencies::empty(),
1619        };
1620
1621        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1622    }
1623
1624    #[test]
1625    fn test_contain_column() -> Result<()> {
1626        // qualified exists
1627        {
1628            let col = Column::from_qualified_name("t1.c0");
1629            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1630            assert!(schema.is_column_from_schema(&col));
1631        }
1632
1633        // qualified not exists
1634        {
1635            let col = Column::from_qualified_name("t1.c2");
1636            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1637            assert!(!schema.is_column_from_schema(&col));
1638        }
1639
1640        // unqualified exists
1641        {
1642            let col = Column::from_name("c0");
1643            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1644            assert!(schema.is_column_from_schema(&col));
1645        }
1646
1647        // unqualified not exists
1648        {
1649            let col = Column::from_name("c2");
1650            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1651            assert!(!schema.is_column_from_schema(&col));
1652        }
1653
1654        Ok(())
1655    }
1656
1657    #[test]
1658    fn test_datatype_is_logically_equal() {
1659        assert!(DFSchema::datatype_is_logically_equal(
1660            &DataType::Int8,
1661            &DataType::Int8
1662        ));
1663
1664        assert!(!DFSchema::datatype_is_logically_equal(
1665            &DataType::Int8,
1666            &DataType::Int16
1667        ));
1668
1669        // Test lists
1670
1671        // Succeeds if both have the same element type, disregards names and nullability
1672        assert!(DFSchema::datatype_is_logically_equal(
1673            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1674            &DataType::List(Field::new("element", DataType::Int8, false).into())
1675        ));
1676
1677        // Fails if element type is different
1678        assert!(!DFSchema::datatype_is_logically_equal(
1679            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1680            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1681        ));
1682
1683        // Test maps
1684        let map_field = DataType::Map(
1685            Field::new(
1686                "entries",
1687                DataType::Struct(Fields::from(vec![
1688                    Field::new("key", DataType::Int8, false),
1689                    Field::new("value", DataType::Int8, true),
1690                ])),
1691                true,
1692            )
1693            .into(),
1694            true,
1695        );
1696
1697        // Succeeds if both maps have the same key and value types, disregards names and nullability
1698        assert!(DFSchema::datatype_is_logically_equal(
1699            &map_field,
1700            &DataType::Map(
1701                Field::new(
1702                    "pairs",
1703                    DataType::Struct(Fields::from(vec![
1704                        Field::new("one", DataType::Int8, false),
1705                        Field::new("two", DataType::Int8, false)
1706                    ])),
1707                    true
1708                )
1709                .into(),
1710                true
1711            )
1712        ));
1713        // Fails if value type is different
1714        assert!(!DFSchema::datatype_is_logically_equal(
1715            &map_field,
1716            &DataType::Map(
1717                Field::new(
1718                    "entries",
1719                    DataType::Struct(Fields::from(vec![
1720                        Field::new("key", DataType::Int8, false),
1721                        Field::new("value", DataType::Int16, true)
1722                    ])),
1723                    true
1724                )
1725                .into(),
1726                true
1727            )
1728        ));
1729
1730        // Fails if key type is different
1731        assert!(!DFSchema::datatype_is_logically_equal(
1732            &map_field,
1733            &DataType::Map(
1734                Field::new(
1735                    "entries",
1736                    DataType::Struct(Fields::from(vec![
1737                        Field::new("key", DataType::Int16, false),
1738                        Field::new("value", DataType::Int8, true)
1739                    ])),
1740                    true
1741                )
1742                .into(),
1743                true
1744            )
1745        ));
1746
1747        // Test structs
1748
1749        let struct_field = DataType::Struct(Fields::from(vec![
1750            Field::new("a", DataType::Int8, true),
1751            Field::new("b", DataType::Int8, true),
1752        ]));
1753
1754        // Succeeds if both have same names and datatypes, ignores nullability
1755        assert!(DFSchema::datatype_is_logically_equal(
1756            &struct_field,
1757            &DataType::Struct(Fields::from(vec![
1758                Field::new("a", DataType::Int8, false),
1759                Field::new("b", DataType::Int8, true),
1760            ]))
1761        ));
1762
1763        // Fails if field names are different
1764        assert!(!DFSchema::datatype_is_logically_equal(
1765            &struct_field,
1766            &DataType::Struct(Fields::from(vec![
1767                Field::new("x", DataType::Int8, true),
1768                Field::new("y", DataType::Int8, true),
1769            ]))
1770        ));
1771
1772        // Fails if types are different
1773        assert!(!DFSchema::datatype_is_logically_equal(
1774            &struct_field,
1775            &DataType::Struct(Fields::from(vec![
1776                Field::new("a", DataType::Int16, true),
1777                Field::new("b", DataType::Int8, true),
1778            ]))
1779        ));
1780
1781        // Fails if more or less fields
1782        assert!(!DFSchema::datatype_is_logically_equal(
1783            &struct_field,
1784            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1785        ));
1786    }
1787
1788    #[test]
1789    fn test_datatype_is_logically_equivalent_to_dictionary() {
1790        // Dictionary is logically equal to its value type
1791        assert!(DFSchema::datatype_is_logically_equal(
1792            &DataType::Utf8,
1793            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1794        ));
1795    }
1796
1797    #[test]
1798    fn test_datatype_is_semantically_equal() {
1799        assert!(DFSchema::datatype_is_semantically_equal(
1800            &DataType::Int8,
1801            &DataType::Int8
1802        ));
1803
1804        assert!(!DFSchema::datatype_is_semantically_equal(
1805            &DataType::Int8,
1806            &DataType::Int16
1807        ));
1808
1809        // Succeeds if decimal precision and scale are different
1810        assert!(DFSchema::datatype_is_semantically_equal(
1811            &DataType::Decimal32(1, 2),
1812            &DataType::Decimal32(2, 1),
1813        ));
1814
1815        assert!(DFSchema::datatype_is_semantically_equal(
1816            &DataType::Decimal64(1, 2),
1817            &DataType::Decimal64(2, 1),
1818        ));
1819
1820        assert!(DFSchema::datatype_is_semantically_equal(
1821            &DataType::Decimal128(1, 2),
1822            &DataType::Decimal128(2, 1),
1823        ));
1824
1825        assert!(DFSchema::datatype_is_semantically_equal(
1826            &DataType::Decimal256(1, 2),
1827            &DataType::Decimal256(2, 1),
1828        ));
1829
1830        // Any two timestamp types should match
1831        assert!(DFSchema::datatype_is_semantically_equal(
1832            &DataType::Timestamp(
1833                arrow::datatypes::TimeUnit::Microsecond,
1834                Some("UTC".into())
1835            ),
1836            &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1837        ));
1838
1839        // Test lists
1840
1841        // Succeeds if both have the same element type, disregards names and nullability
1842        assert!(DFSchema::datatype_is_semantically_equal(
1843            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1844            &DataType::List(Field::new("element", DataType::Int8, false).into())
1845        ));
1846
1847        // Fails if element type is different
1848        assert!(!DFSchema::datatype_is_semantically_equal(
1849            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1850            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1851        ));
1852
1853        // Test maps
1854        let map_field = DataType::Map(
1855            Field::new(
1856                "entries",
1857                DataType::Struct(Fields::from(vec![
1858                    Field::new("key", DataType::Int8, false),
1859                    Field::new("value", DataType::Int8, true),
1860                ])),
1861                true,
1862            )
1863            .into(),
1864            true,
1865        );
1866
1867        // Succeeds if both maps have the same key and value types, disregards names and nullability
1868        assert!(DFSchema::datatype_is_semantically_equal(
1869            &map_field,
1870            &DataType::Map(
1871                Field::new(
1872                    "pairs",
1873                    DataType::Struct(Fields::from(vec![
1874                        Field::new("one", DataType::Int8, false),
1875                        Field::new("two", DataType::Int8, false)
1876                    ])),
1877                    true
1878                )
1879                .into(),
1880                true
1881            )
1882        ));
1883        // Fails if value type is different
1884        assert!(!DFSchema::datatype_is_semantically_equal(
1885            &map_field,
1886            &DataType::Map(
1887                Field::new(
1888                    "entries",
1889                    DataType::Struct(Fields::from(vec![
1890                        Field::new("key", DataType::Int8, false),
1891                        Field::new("value", DataType::Int16, true)
1892                    ])),
1893                    true
1894                )
1895                .into(),
1896                true
1897            )
1898        ));
1899
1900        // Fails if key type is different
1901        assert!(!DFSchema::datatype_is_semantically_equal(
1902            &map_field,
1903            &DataType::Map(
1904                Field::new(
1905                    "entries",
1906                    DataType::Struct(Fields::from(vec![
1907                        Field::new("key", DataType::Int16, false),
1908                        Field::new("value", DataType::Int8, true)
1909                    ])),
1910                    true
1911                )
1912                .into(),
1913                true
1914            )
1915        ));
1916
1917        // Test structs
1918
1919        let struct_field = DataType::Struct(Fields::from(vec![
1920            Field::new("a", DataType::Int8, true),
1921            Field::new("b", DataType::Int8, true),
1922        ]));
1923
1924        // Succeeds if both have same names and datatypes, ignores nullability
1925        assert!(DFSchema::datatype_is_logically_equal(
1926            &struct_field,
1927            &DataType::Struct(Fields::from(vec![
1928                Field::new("a", DataType::Int8, false),
1929                Field::new("b", DataType::Int8, true),
1930            ]))
1931        ));
1932
1933        // Fails if field names are different
1934        assert!(!DFSchema::datatype_is_logically_equal(
1935            &struct_field,
1936            &DataType::Struct(Fields::from(vec![
1937                Field::new("x", DataType::Int8, true),
1938                Field::new("y", DataType::Int8, true),
1939            ]))
1940        ));
1941
1942        // Fails if types are different
1943        assert!(!DFSchema::datatype_is_logically_equal(
1944            &struct_field,
1945            &DataType::Struct(Fields::from(vec![
1946                Field::new("a", DataType::Int16, true),
1947                Field::new("b", DataType::Int8, true),
1948            ]))
1949        ));
1950
1951        // Fails if more or less fields
1952        assert!(!DFSchema::datatype_is_logically_equal(
1953            &struct_field,
1954            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1955        ));
1956    }
1957
1958    #[test]
1959    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1960        // Dictionary is not semantically equal to its value type
1961        assert!(!DFSchema::datatype_is_semantically_equal(
1962            &DataType::Utf8,
1963            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1964        ));
1965    }
1966
1967    fn test_schema_2() -> Schema {
1968        Schema::new(vec![
1969            Field::new("c100", DataType::Boolean, true),
1970            Field::new("c101", DataType::Boolean, true),
1971        ])
1972    }
1973
1974    fn test_metadata() -> HashMap<String, String> {
1975        test_metadata_n(2)
1976    }
1977
1978    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1979        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1980    }
1981
1982    #[test]
1983    fn test_print_schema_unqualified() {
1984        let schema = DFSchema::from_unqualified_fields(
1985            vec![
1986                Field::new("id", DataType::Int32, false),
1987                Field::new("name", DataType::Utf8, true),
1988                Field::new("age", DataType::Int64, true),
1989                Field::new("active", DataType::Boolean, false),
1990            ]
1991            .into(),
1992            HashMap::new(),
1993        )
1994        .unwrap();
1995
1996        let output = schema.tree_string();
1997
1998        insta::assert_snapshot!(output, @r"
1999        root
2000         |-- id: int32 (nullable = false)
2001         |-- name: utf8 (nullable = true)
2002         |-- age: int64 (nullable = true)
2003         |-- active: boolean (nullable = false)
2004        ");
2005    }
2006
2007    #[test]
2008    fn test_print_schema_qualified() {
2009        let schema = DFSchema::try_from_qualified_schema(
2010            "table1",
2011            &Schema::new(vec![
2012                Field::new("id", DataType::Int32, false),
2013                Field::new("name", DataType::Utf8, true),
2014            ]),
2015        )
2016        .unwrap();
2017
2018        let output = schema.tree_string();
2019
2020        insta::assert_snapshot!(output, @r"
2021        root
2022         |-- table1.id: int32 (nullable = false)
2023         |-- table1.name: utf8 (nullable = true)
2024        ");
2025    }
2026
2027    #[test]
2028    fn test_print_schema_complex_types() {
2029        let struct_field = Field::new(
2030            "address",
2031            DataType::Struct(Fields::from(vec![
2032                Field::new("street", DataType::Utf8, true),
2033                Field::new("city", DataType::Utf8, true),
2034            ])),
2035            true,
2036        );
2037
2038        let list_field = Field::new(
2039            "tags",
2040            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2041            true,
2042        );
2043
2044        let schema = DFSchema::from_unqualified_fields(
2045            vec![
2046                Field::new("id", DataType::Int32, false),
2047                struct_field,
2048                list_field,
2049                Field::new("score", DataType::Decimal128(10, 2), true),
2050            ]
2051            .into(),
2052            HashMap::new(),
2053        )
2054        .unwrap();
2055
2056        let output = schema.tree_string();
2057        insta::assert_snapshot!(output, @r"
2058        root
2059         |-- id: int32 (nullable = false)
2060         |-- address: struct (nullable = true)
2061         |    |-- street: utf8 (nullable = true)
2062         |    |-- city: utf8 (nullable = true)
2063         |-- tags: list (nullable = true)
2064         |    |-- item: utf8 (nullable = true)
2065         |-- score: decimal128(10, 2) (nullable = true)
2066        ");
2067    }
2068
2069    #[test]
2070    fn test_print_schema_empty() {
2071        let schema = DFSchema::empty();
2072        let output = schema.tree_string();
2073        insta::assert_snapshot!(output, @"root");
2074    }
2075
2076    #[test]
2077    fn test_print_schema_deeply_nested_types() {
2078        // Create a deeply nested structure to test indentation and complex type formatting
2079        let inner_struct = Field::new(
2080            "inner",
2081            DataType::Struct(Fields::from(vec![
2082                Field::new("level1", DataType::Utf8, true),
2083                Field::new("level2", DataType::Int32, false),
2084            ])),
2085            true,
2086        );
2087
2088        let nested_list = Field::new(
2089            "nested_list",
2090            DataType::List(Arc::new(Field::new(
2091                "item",
2092                DataType::Struct(Fields::from(vec![
2093                    Field::new("id", DataType::Int64, false),
2094                    Field::new("value", DataType::Float64, true),
2095                ])),
2096                true,
2097            ))),
2098            true,
2099        );
2100
2101        let map_field = Field::new(
2102            "map_data",
2103            DataType::Map(
2104                Arc::new(Field::new(
2105                    "entries",
2106                    DataType::Struct(Fields::from(vec![
2107                        Field::new("key", DataType::Utf8, false),
2108                        Field::new(
2109                            "value",
2110                            DataType::List(Arc::new(Field::new(
2111                                "item",
2112                                DataType::Int32,
2113                                true,
2114                            ))),
2115                            true,
2116                        ),
2117                    ])),
2118                    false,
2119                )),
2120                false,
2121            ),
2122            true,
2123        );
2124
2125        let schema = DFSchema::from_unqualified_fields(
2126            vec![
2127                Field::new("simple_field", DataType::Utf8, true),
2128                inner_struct,
2129                nested_list,
2130                map_field,
2131                Field::new(
2132                    "timestamp_field",
2133                    DataType::Timestamp(
2134                        arrow::datatypes::TimeUnit::Microsecond,
2135                        Some("UTC".into()),
2136                    ),
2137                    false,
2138                ),
2139            ]
2140            .into(),
2141            HashMap::new(),
2142        )
2143        .unwrap();
2144
2145        let output = schema.tree_string();
2146
2147        insta::assert_snapshot!(output, @r"
2148        root
2149         |-- simple_field: utf8 (nullable = true)
2150         |-- inner: struct (nullable = true)
2151         |    |-- level1: utf8 (nullable = true)
2152         |    |-- level2: int32 (nullable = false)
2153         |-- nested_list: list (nullable = true)
2154         |    |-- item: struct (nullable = true)
2155         |    |    |-- id: int64 (nullable = false)
2156         |    |    |-- value: float64 (nullable = true)
2157         |-- map_data: map (nullable = true)
2158         |    |-- key: utf8 (nullable = false)
2159         |    |-- value: list (nullable = true)
2160         |    |    |-- item: int32 (nullable = true)
2161         |-- timestamp_field: timestamp (UTC) (nullable = false)
2162        ");
2163    }
2164
2165    #[test]
2166    fn test_print_schema_mixed_qualified_unqualified() {
2167        // Test a schema with mixed qualified and unqualified fields
2168        let schema = DFSchema::new_with_metadata(
2169            vec![
2170                (
2171                    Some("table1".into()),
2172                    Arc::new(Field::new("id", DataType::Int32, false)),
2173                ),
2174                (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2175                (
2176                    Some("table2".into()),
2177                    Arc::new(Field::new("score", DataType::Float64, true)),
2178                ),
2179                (
2180                    None,
2181                    Arc::new(Field::new("active", DataType::Boolean, false)),
2182                ),
2183            ],
2184            HashMap::new(),
2185        )
2186        .unwrap();
2187
2188        let output = schema.tree_string();
2189
2190        insta::assert_snapshot!(output, @r"
2191        root
2192         |-- table1.id: int32 (nullable = false)
2193         |-- name: utf8 (nullable = true)
2194         |-- table2.score: float64 (nullable = true)
2195         |-- active: boolean (nullable = false)
2196        ");
2197    }
2198
2199    #[test]
2200    fn test_print_schema_array_of_map() {
2201        // Test the specific example from user feedback: array of map
2202        let map_field = Field::new(
2203            "entries",
2204            DataType::Struct(Fields::from(vec![
2205                Field::new("key", DataType::Utf8, false),
2206                Field::new("value", DataType::Utf8, false),
2207            ])),
2208            false,
2209        );
2210
2211        let array_of_map_field = Field::new(
2212            "array_map_field",
2213            DataType::List(Arc::new(Field::new(
2214                "item",
2215                DataType::Map(Arc::new(map_field), false),
2216                false,
2217            ))),
2218            false,
2219        );
2220
2221        let schema = DFSchema::from_unqualified_fields(
2222            vec![array_of_map_field].into(),
2223            HashMap::new(),
2224        )
2225        .unwrap();
2226
2227        let output = schema.tree_string();
2228
2229        insta::assert_snapshot!(output, @r"
2230        root
2231         |-- array_map_field: list (nullable = false)
2232         |    |-- item: map (nullable = false)
2233         |    |    |-- key: utf8 (nullable = false)
2234         |    |    |-- value: utf8 (nullable = false)
2235        ");
2236    }
2237
2238    #[test]
2239    fn test_print_schema_complex_type_combinations() {
2240        // Test various combinations of list, struct, and map types
2241
2242        // List of structs
2243        let list_of_structs = Field::new(
2244            "list_of_structs",
2245            DataType::List(Arc::new(Field::new(
2246                "item",
2247                DataType::Struct(Fields::from(vec![
2248                    Field::new("id", DataType::Int32, false),
2249                    Field::new("name", DataType::Utf8, true),
2250                    Field::new("score", DataType::Float64, true),
2251                ])),
2252                true,
2253            ))),
2254            true,
2255        );
2256
2257        // Struct containing lists
2258        let struct_with_lists = Field::new(
2259            "struct_with_lists",
2260            DataType::Struct(Fields::from(vec![
2261                Field::new(
2262                    "tags",
2263                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2264                    true,
2265                ),
2266                Field::new(
2267                    "scores",
2268                    DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2269                    false,
2270                ),
2271                Field::new("metadata", DataType::Utf8, true),
2272            ])),
2273            false,
2274        );
2275
2276        // Map with struct values
2277        let map_with_struct_values = Field::new(
2278            "map_with_struct_values",
2279            DataType::Map(
2280                Arc::new(Field::new(
2281                    "entries",
2282                    DataType::Struct(Fields::from(vec![
2283                        Field::new("key", DataType::Utf8, false),
2284                        Field::new(
2285                            "value",
2286                            DataType::Struct(Fields::from(vec![
2287                                Field::new("count", DataType::Int64, false),
2288                                Field::new("active", DataType::Boolean, true),
2289                            ])),
2290                            true,
2291                        ),
2292                    ])),
2293                    false,
2294                )),
2295                false,
2296            ),
2297            true,
2298        );
2299
2300        // List of maps
2301        let list_of_maps = Field::new(
2302            "list_of_maps",
2303            DataType::List(Arc::new(Field::new(
2304                "item",
2305                DataType::Map(
2306                    Arc::new(Field::new(
2307                        "entries",
2308                        DataType::Struct(Fields::from(vec![
2309                            Field::new("key", DataType::Utf8, false),
2310                            Field::new("value", DataType::Int32, true),
2311                        ])),
2312                        false,
2313                    )),
2314                    false,
2315                ),
2316                true,
2317            ))),
2318            true,
2319        );
2320
2321        // Deeply nested: struct containing list of structs containing maps
2322        let deeply_nested = Field::new(
2323            "deeply_nested",
2324            DataType::Struct(Fields::from(vec![
2325                Field::new("level1", DataType::Utf8, true),
2326                Field::new(
2327                    "level2",
2328                    DataType::List(Arc::new(Field::new(
2329                        "item",
2330                        DataType::Struct(Fields::from(vec![
2331                            Field::new("id", DataType::Int32, false),
2332                            Field::new(
2333                                "properties",
2334                                DataType::Map(
2335                                    Arc::new(Field::new(
2336                                        "entries",
2337                                        DataType::Struct(Fields::from(vec![
2338                                            Field::new("key", DataType::Utf8, false),
2339                                            Field::new("value", DataType::Float64, true),
2340                                        ])),
2341                                        false,
2342                                    )),
2343                                    false,
2344                                ),
2345                                true,
2346                            ),
2347                        ])),
2348                        true,
2349                    ))),
2350                    false,
2351                ),
2352            ])),
2353            true,
2354        );
2355
2356        let schema = DFSchema::from_unqualified_fields(
2357            vec![
2358                list_of_structs,
2359                struct_with_lists,
2360                map_with_struct_values,
2361                list_of_maps,
2362                deeply_nested,
2363            ]
2364            .into(),
2365            HashMap::new(),
2366        )
2367        .unwrap();
2368
2369        let output = schema.tree_string();
2370
2371        insta::assert_snapshot!(output, @r"
2372        root
2373         |-- list_of_structs: list (nullable = true)
2374         |    |-- item: struct (nullable = true)
2375         |    |    |-- id: int32 (nullable = false)
2376         |    |    |-- name: utf8 (nullable = true)
2377         |    |    |-- score: float64 (nullable = true)
2378         |-- struct_with_lists: struct (nullable = false)
2379         |    |-- tags: list (nullable = true)
2380         |    |    |-- item: utf8 (nullable = true)
2381         |    |-- scores: list (nullable = false)
2382         |    |    |-- item: int32 (nullable = true)
2383         |    |-- metadata: utf8 (nullable = true)
2384         |-- map_with_struct_values: map (nullable = true)
2385         |    |-- key: utf8 (nullable = false)
2386         |    |-- value: struct (nullable = true)
2387         |    |    |-- count: int64 (nullable = false)
2388         |    |    |-- active: boolean (nullable = true)
2389         |-- list_of_maps: list (nullable = true)
2390         |    |-- item: map (nullable = true)
2391         |    |    |-- key: utf8 (nullable = false)
2392         |    |    |-- value: int32 (nullable = false)
2393         |-- deeply_nested: struct (nullable = true)
2394         |    |-- level1: utf8 (nullable = true)
2395         |    |-- level2: list (nullable = false)
2396         |    |    |-- item: struct (nullable = true)
2397         |    |    |    |-- id: int32 (nullable = false)
2398         |    |    |    |-- properties: map (nullable = true)
2399         |    |    |    |    |-- key: utf8 (nullable = false)
2400         |    |    |    |    |-- value: float64 (nullable = false)
2401        ");
2402    }
2403
2404    #[test]
2405    fn test_print_schema_edge_case_types() {
2406        // Test edge cases and special types
2407        let schema = DFSchema::from_unqualified_fields(
2408            vec![
2409                Field::new("null_field", DataType::Null, true),
2410                Field::new("binary_field", DataType::Binary, false),
2411                Field::new("large_binary", DataType::LargeBinary, true),
2412                Field::new("large_utf8", DataType::LargeUtf8, false),
2413                Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2414                Field::new(
2415                    "fixed_size_list",
2416                    DataType::FixedSizeList(
2417                        Arc::new(Field::new("item", DataType::Int32, true)),
2418                        5,
2419                    ),
2420                    false,
2421                ),
2422                Field::new("decimal32", DataType::Decimal32(9, 4), true),
2423                Field::new("decimal64", DataType::Decimal64(9, 4), true),
2424                Field::new("decimal128", DataType::Decimal128(18, 4), true),
2425                Field::new("decimal256", DataType::Decimal256(38, 10), false),
2426                Field::new("date32", DataType::Date32, true),
2427                Field::new("date64", DataType::Date64, false),
2428                Field::new(
2429                    "time32_seconds",
2430                    DataType::Time32(arrow::datatypes::TimeUnit::Second),
2431                    true,
2432                ),
2433                Field::new(
2434                    "time64_nanoseconds",
2435                    DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2436                    false,
2437                ),
2438            ]
2439            .into(),
2440            HashMap::new(),
2441        )
2442        .unwrap();
2443
2444        let output = schema.tree_string();
2445
2446        insta::assert_snapshot!(output, @r"
2447        root
2448         |-- null_field: null (nullable = true)
2449         |-- binary_field: binary (nullable = false)
2450         |-- large_binary: large_binary (nullable = true)
2451         |-- large_utf8: large_utf8 (nullable = false)
2452         |-- fixed_size_binary: fixed_size_binary (nullable = true)
2453         |-- fixed_size_list: fixed size list (nullable = false)
2454         |    |-- item: int32 (nullable = true)
2455         |-- decimal32: decimal32(9, 4) (nullable = true)
2456         |-- decimal64: decimal64(9, 4) (nullable = true)
2457         |-- decimal128: decimal128(18, 4) (nullable = true)
2458         |-- decimal256: decimal256(38, 10) (nullable = false)
2459         |-- date32: date32 (nullable = true)
2460         |-- date64: date64 (nullable = false)
2461         |-- time32_seconds: time32 (nullable = true)
2462         |-- time64_nanoseconds: time64 (nullable = false)
2463        ");
2464    }
2465}