Skip to main content

datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::{Arc, LazyLock};
25
26use crate::error::{_plan_err, _schema_err, DataFusionError, Result};
27use crate::{
28    Column, FunctionalDependencies, SchemaError, TableReference, field_not_found,
29    unqualified_field_not_found,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and add a relation (table) name.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///]
51/// # See Also
52/// * [DFSchemaRef], an alias to `Arc<DFSchema>`
53/// * [DataTypeExt], common methods for working with Arrow [DataType]s
54/// * [FieldExt], extension methods for working with Arrow [Field]s
55///
56/// [DataTypeExt]: crate::datatype::DataTypeExt
57/// [FieldExt]: crate::datatype::FieldExt
58///
59/// # Creating qualified schemas
60///
61/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
62/// an Arrow schema.
63///
64/// ```rust
65/// use arrow::datatypes::{DataType, Field, Schema};
66/// use datafusion_common::{Column, DFSchema};
67///
68/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
69///
70/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
71/// let column = Column::from_qualified_name("t1.c1");
72/// assert!(df_schema.has_column(&column));
73///
74/// // Can also access qualified fields with unqualified name, if it's unambiguous
75/// let column = Column::from_qualified_name("c1");
76/// assert!(df_schema.has_column(&column));
77/// ```
78///
79/// # Creating unqualified schemas
80///
81/// Create an unqualified schema using TryFrom:
82///
83/// ```rust
84/// use arrow::datatypes::{DataType, Field, Schema};
85/// use datafusion_common::{Column, DFSchema};
86///
87/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
88///
89/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
90/// let column = Column::new_unqualified("c1");
91/// assert!(df_schema.has_column(&column));
92/// ```
93///
94/// # Converting back to Arrow schema
95///
96/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
97///
98/// ```rust
99/// use arrow::datatypes::{Field, Schema};
100/// use datafusion_common::DFSchema;
101/// use std::collections::HashMap;
102///
103/// let df_schema = DFSchema::from_unqualified_fields(
104///     vec![Field::new("c1", arrow::datatypes::DataType::Int32, false)].into(),
105///     HashMap::new(),
106/// )
107/// .unwrap();
108/// let schema: &Schema = df_schema.as_arrow();
109/// assert_eq!(schema.fields().len(), 1);
110/// ```
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct DFSchema {
113    /// Inner Arrow schema reference.
114    inner: SchemaRef,
115    /// Optional qualifiers for each column in this schema. In the same order as
116    /// the `self.inner.fields()`
117    field_qualifiers: Vec<Option<TableReference>>,
118    /// Stores functional dependencies in the schema.
119    functional_dependencies: FunctionalDependencies,
120}
121
122impl DFSchema {
123    /// Creates an empty `DFSchema`
124    pub fn empty() -> Self {
125        Self {
126            inner: Arc::new(Schema::new([])),
127            field_qualifiers: vec![],
128            functional_dependencies: FunctionalDependencies::empty(),
129        }
130    }
131
132    /// Returns a reference to a shared empty [`DFSchema`].
133    pub fn empty_ref() -> &'static DFSchemaRef {
134        static EMPTY: LazyLock<DFSchemaRef> =
135            LazyLock::new(|| Arc::new(DFSchema::empty()));
136        &EMPTY
137    }
138
139    /// Return a reference to the inner Arrow [`Schema`]
140    ///
141    /// Note this does not have the qualifier information
142    pub fn as_arrow(&self) -> &Schema {
143        self.inner.as_ref()
144    }
145
146    /// Return a reference to the inner Arrow [`SchemaRef`]
147    ///
148    /// Note this does not have the qualifier information
149    pub fn inner(&self) -> &SchemaRef {
150        &self.inner
151    }
152
153    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
154    pub fn new_with_metadata(
155        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
156        metadata: HashMap<String, String>,
157    ) -> Result<Self> {
158        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
159            qualified_fields.into_iter().unzip();
160
161        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
162
163        let dfschema = Self {
164            inner: schema,
165            field_qualifiers: qualifiers,
166            functional_dependencies: FunctionalDependencies::empty(),
167        };
168        dfschema.check_names()?;
169        Ok(dfschema)
170    }
171
172    /// Create a new `DFSchema` from a list of Arrow [Field]s
173    pub fn from_unqualified_fields(
174        fields: Fields,
175        metadata: HashMap<String, String>,
176    ) -> Result<Self> {
177        let field_count = fields.len();
178        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
179        let dfschema = Self {
180            inner: schema,
181            field_qualifiers: vec![None; field_count],
182            functional_dependencies: FunctionalDependencies::empty(),
183        };
184        dfschema.check_names()?;
185        Ok(dfschema)
186    }
187
188    /// Create a `DFSchema` from an Arrow schema and a given qualifier
189    ///
190    /// To create a schema from an Arrow schema without a qualifier, use
191    /// `DFSchema::try_from`.
192    pub fn try_from_qualified_schema(
193        qualifier: impl Into<TableReference>,
194        schema: &Schema,
195    ) -> Result<Self> {
196        let qualifier = qualifier.into();
197        let schema = DFSchema {
198            inner: schema.clone().into(),
199            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
200            functional_dependencies: FunctionalDependencies::empty(),
201        };
202        schema.check_names()?;
203        Ok(schema)
204    }
205
206    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
207    pub fn from_field_specific_qualified_schema(
208        qualifiers: Vec<Option<TableReference>>,
209        schema: &SchemaRef,
210    ) -> Result<Self> {
211        let dfschema = Self {
212            inner: Arc::clone(schema),
213            field_qualifiers: qualifiers,
214            functional_dependencies: FunctionalDependencies::empty(),
215        };
216        dfschema.check_names()?;
217        Ok(dfschema)
218    }
219
220    /// Return the same schema, where all fields have a given qualifier.
221    pub fn with_field_specific_qualified_schema(
222        &self,
223        qualifiers: Vec<Option<TableReference>>,
224    ) -> Result<Self> {
225        if qualifiers.len() != self.fields().len() {
226            return _plan_err!(
227                "Number of qualifiers must match number of fields. Expected {}, got {}",
228                self.fields().len(),
229                qualifiers.len()
230            );
231        }
232        Ok(DFSchema {
233            inner: Arc::clone(&self.inner),
234            field_qualifiers: qualifiers,
235            functional_dependencies: self.functional_dependencies.clone(),
236        })
237    }
238
239    /// Check if the schema have some fields with the same name
240    pub fn check_names(&self) -> Result<()> {
241        let mut qualified_names = BTreeSet::new();
242        let mut unqualified_names = BTreeSet::new();
243
244        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
245            if let Some(qualifier) = qualifier {
246                if !qualified_names.insert((qualifier, field.name())) {
247                    return _schema_err!(SchemaError::DuplicateQualifiedField {
248                        qualifier: Box::new(qualifier.clone()),
249                        name: field.name().to_string(),
250                    });
251                }
252            } else if !unqualified_names.insert(field.name()) {
253                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
254                    name: field.name().to_string()
255                });
256            }
257        }
258
259        for (qualifier, name) in qualified_names {
260            if unqualified_names.contains(name) {
261                return _schema_err!(SchemaError::AmbiguousReference {
262                    field: Box::new(Column::new(Some(qualifier.clone()), name))
263                });
264            }
265        }
266        Ok(())
267    }
268
269    /// Assigns functional dependencies.
270    pub fn with_functional_dependencies(
271        mut self,
272        functional_dependencies: FunctionalDependencies,
273    ) -> Result<Self> {
274        if functional_dependencies.is_valid(self.inner.fields.len()) {
275            self.functional_dependencies = functional_dependencies;
276            Ok(self)
277        } else {
278            _plan_err!(
279                "Invalid functional dependency: {:?}",
280                functional_dependencies
281            )
282        }
283    }
284
285    /// Create a new schema that contains the fields from this schema followed by the fields
286    /// from the supplied schema. An error will be returned if there are duplicate field names.
287    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
288        let mut schema_builder = SchemaBuilder::new();
289        schema_builder.extend(self.inner.fields().iter().cloned());
290        schema_builder.extend(schema.fields().iter().cloned());
291        let new_schema = schema_builder.finish();
292
293        let mut new_metadata = self.inner.metadata.clone();
294        new_metadata.extend(schema.inner.metadata.clone());
295        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
296
297        let mut new_qualifiers = self.field_qualifiers.clone();
298        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
299
300        let new_self = Self {
301            inner: Arc::new(new_schema_with_metadata),
302            field_qualifiers: new_qualifiers,
303            functional_dependencies: FunctionalDependencies::empty(),
304        };
305        new_self.check_names()?;
306        Ok(new_self)
307    }
308
309    /// Modify this schema by appending the fields from the supplied schema, ignoring any
310    /// duplicate fields.
311    ///
312    /// ## Merge Precedence
313    ///
314    /// **Schema-level metadata**: Metadata from both schemas is merged.
315    /// If both schemas have the same metadata key, the value from the `other_schema` parameter takes precedence.
316    ///
317    /// **Field-level merging**: Only non-duplicate fields are added. This means that the
318    /// `self` fields will always take precedence over the `other_schema` fields.
319    /// Duplicate field detection is based on:
320    /// - For qualified fields: both qualifier and field name must match
321    /// - For unqualified fields: only field name needs to match
322    ///
323    /// Take note how the precedence for fields & metadata merging differs;
324    /// merging prefers fields from `self` but prefers metadata from `other_schema`.
325    pub fn merge(&mut self, other_schema: &DFSchema) {
326        if other_schema.inner.fields.is_empty() {
327            return;
328        }
329
330        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
331            self.iter().collect();
332        let self_unqualified_names: HashSet<&str> = self
333            .inner
334            .fields
335            .iter()
336            .map(|field| field.name().as_str())
337            .collect();
338
339        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
340        let mut qualifiers = Vec::new();
341        for (qualifier, field) in other_schema.iter() {
342            // skip duplicate columns
343            let duplicated_field = match qualifier {
344                Some(q) => self_fields.contains(&(Some(q), field)),
345                // for unqualified columns, check as unqualified name
346                None => self_unqualified_names.contains(field.name().as_str()),
347            };
348            if !duplicated_field {
349                schema_builder.push(Arc::clone(field));
350                qualifiers.push(qualifier.cloned());
351            }
352        }
353        let mut metadata = self.inner.metadata.clone();
354        metadata.extend(other_schema.inner.metadata.clone());
355
356        let finished = schema_builder.finish();
357        let finished_with_metadata = finished.with_metadata(metadata);
358        self.inner = finished_with_metadata.into();
359        self.field_qualifiers.extend(qualifiers);
360    }
361
362    /// Get a list of fields for this schema
363    pub fn fields(&self) -> &Fields {
364        &self.inner.fields
365    }
366
367    /// Returns a reference to [`FieldRef`] for a column at specific index
368    /// within the schema.
369    ///
370    /// See also [Self::qualified_field] to get both qualifier and field
371    pub fn field(&self, i: usize) -> &FieldRef {
372        &self.inner.fields[i]
373    }
374
375    /// Returns the qualifier (if any) and [`FieldRef`] for a column at specific
376    /// index within the schema.
377    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &FieldRef) {
378        (self.field_qualifiers[i].as_ref(), self.field(i))
379    }
380
381    pub fn index_of_column_by_name(
382        &self,
383        qualifier: Option<&TableReference>,
384        name: &str,
385    ) -> Option<usize> {
386        let mut matches = self
387            .iter()
388            .enumerate()
389            .filter(|(_, (q, f))| match (qualifier, q) {
390                // field to lookup is qualified.
391                // current field is qualified and not shared between relations, compare both
392                // qualifier and name.
393                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
394                // field to lookup is qualified but current field is unqualified.
395                (Some(_), None) => false,
396                // field to lookup is unqualified, no need to compare qualifier
397                (None, Some(_)) | (None, None) => f.name() == name,
398            })
399            .map(|(idx, _)| idx);
400        matches.next()
401    }
402
403    /// Find the index of the column with the given qualifier and name,
404    /// returning `None` if not found
405    ///
406    /// See [Self::index_of_column] for a version that returns an error if the
407    /// column is not found
408    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
409        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
410    }
411
412    /// Find the index of the column with the given qualifier and name,
413    /// returning `Err` if not found
414    ///
415    /// See [Self::maybe_index_of_column] for a version that returns `None` if
416    /// the column is not found
417    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
418        self.maybe_index_of_column(col)
419            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
420    }
421
422    /// Check if the column is in the current schema
423    pub fn is_column_from_schema(&self, col: &Column) -> bool {
424        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
425            .is_some()
426    }
427
428    /// Find the [`FieldRef`] with the given name and optional qualifier
429    pub fn field_with_name(
430        &self,
431        qualifier: Option<&TableReference>,
432        name: &str,
433    ) -> Result<&FieldRef> {
434        if let Some(qualifier) = qualifier {
435            self.field_with_qualified_name(qualifier, name)
436        } else {
437            self.field_with_unqualified_name(name)
438        }
439    }
440
441    /// Find the qualified field with the given name
442    pub fn qualified_field_with_name(
443        &self,
444        qualifier: Option<&TableReference>,
445        name: &str,
446    ) -> Result<(Option<&TableReference>, &FieldRef)> {
447        if let Some(qualifier) = qualifier {
448            let idx = self
449                .index_of_column_by_name(Some(qualifier), name)
450                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
451            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
452        } else {
453            self.qualified_field_with_unqualified_name(name)
454        }
455    }
456
457    /// Find all fields having the given qualifier
458    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&FieldRef> {
459        self.iter()
460            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
461            .map(|(_, f)| f)
462            .collect()
463    }
464
465    /// Find all fields indices having the given qualifier
466    pub fn fields_indices_with_qualified(
467        &self,
468        qualifier: &TableReference,
469    ) -> Vec<usize> {
470        self.iter()
471            .enumerate()
472            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
473            .collect()
474    }
475
476    /// Find all fields that match the given name
477    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&FieldRef> {
478        self.fields()
479            .iter()
480            .filter(|field| field.name() == name)
481            .collect()
482    }
483
484    /// Find all fields that match the given name and return them with their qualifier
485    pub fn qualified_fields_with_unqualified_name(
486        &self,
487        name: &str,
488    ) -> Vec<(Option<&TableReference>, &FieldRef)> {
489        self.iter()
490            .filter(|(_, field)| field.name() == name)
491            .collect()
492    }
493
494    /// Find all fields that match the given name and convert to column
495    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
496        self.iter()
497            .filter(|(_, field)| field.name() == name)
498            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
499            .collect()
500    }
501
502    /// Return all `Column`s for the schema
503    pub fn columns(&self) -> Vec<Column> {
504        self.iter()
505            .map(|(qualifier, field)| {
506                Column::new(qualifier.cloned(), field.name().clone())
507            })
508            .collect()
509    }
510
511    /// Find the qualified field with the given unqualified name
512    pub fn qualified_field_with_unqualified_name(
513        &self,
514        name: &str,
515    ) -> Result<(Option<&TableReference>, &FieldRef)> {
516        let matches = self.qualified_fields_with_unqualified_name(name);
517        match matches.len() {
518            0 => Err(unqualified_field_not_found(name, self)),
519            1 => Ok((matches[0].0, matches[0].1)),
520            _ => {
521                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
522                // Because name may generate from Alias/... . It means that it don't own qualifier.
523                // For example:
524                //             Join on id = b.id
525                // Project a.id as id   TableScan b id
526                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
527                // one field without qualifier, we should return it.
528                let fields_without_qualifier = matches
529                    .iter()
530                    .filter(|(q, _)| q.is_none())
531                    .collect::<Vec<_>>();
532                if fields_without_qualifier.len() == 1 {
533                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
534                } else {
535                    _schema_err!(SchemaError::AmbiguousReference {
536                        field: Box::new(Column::new_unqualified(name.to_string()))
537                    })
538                }
539            }
540        }
541    }
542
543    /// Find the field with the given name
544    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&FieldRef> {
545        self.qualified_field_with_unqualified_name(name)
546            .map(|(_, field)| field)
547    }
548
549    /// Find the field with the given qualified name
550    pub fn field_with_qualified_name(
551        &self,
552        qualifier: &TableReference,
553        name: &str,
554    ) -> Result<&FieldRef> {
555        let idx = self
556            .index_of_column_by_name(Some(qualifier), name)
557            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
558
559        Ok(self.field(idx))
560    }
561
562    /// Find the field with the given qualified column
563    pub fn qualified_field_from_column(
564        &self,
565        column: &Column,
566    ) -> Result<(Option<&TableReference>, &FieldRef)> {
567        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
568    }
569
570    /// Find if the field exists with the given name
571    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
572        self.fields().iter().any(|field| field.name() == name)
573    }
574
575    /// Find if the field exists with the given qualified name
576    pub fn has_column_with_qualified_name(
577        &self,
578        qualifier: &TableReference,
579        name: &str,
580    ) -> bool {
581        self.iter()
582            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
583    }
584
585    /// Find if the field exists with the given qualified column
586    pub fn has_column(&self, column: &Column) -> bool {
587        match &column.relation {
588            Some(r) => self.has_column_with_qualified_name(r, &column.name),
589            None => self.has_column_with_unqualified_name(&column.name),
590        }
591    }
592
593    /// Check to see if unqualified field names matches field names in Arrow schema
594    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
595        self.inner
596            .fields
597            .iter()
598            .zip(arrow_schema.fields().iter())
599            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
600    }
601
602    /// Check to see if fields in 2 Arrow schemas are compatible
603    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
604    pub fn check_arrow_schema_type_compatible(
605        &self,
606        arrow_schema: &Schema,
607    ) -> Result<()> {
608        let self_arrow_schema = self.as_arrow();
609        self_arrow_schema
610            .fields()
611            .iter()
612            .zip(arrow_schema.fields().iter())
613            .try_for_each(|(l_field, r_field)| {
614                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
615                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
616                                r_field.name(),
617                                r_field.data_type(),
618                                l_field.name(),
619                                l_field.data_type())
620                } else {
621                    Ok(())
622                }
623            })
624    }
625
626    /// Returns true if the two schemas have the same qualified named
627    /// fields with logically equivalent data types. Returns false otherwise.
628    ///
629    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
630    /// equivalence checking.
631    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
632        if self.fields().len() != other.fields().len() {
633            return false;
634        }
635        let self_fields = self.iter();
636        let other_fields = other.iter();
637        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
638            q1 == q2
639                && f1.name() == f2.name()
640                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
641        })
642    }
643
644    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
645    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
646        self.has_equivalent_names_and_types(other).is_ok()
647    }
648
649    /// Returns Ok if the two schemas have the same qualified named
650    /// fields with the compatible data types.
651    ///
652    /// Returns an `Err` with a message otherwise.
653    ///
654    /// This is a specialized version of Eq that ignores differences in
655    /// nullability and metadata.
656    ///
657    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
658    /// logical type checking, which for example would consider a dictionary
659    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
660    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
661        // case 1 : schema length mismatch
662        if self.fields().len() != other.fields().len() {
663            _plan_err!(
664                "Schema mismatch: the schema length are not same \
665            Expected schema length: {}, got: {}",
666                self.fields().len(),
667                other.fields().len()
668            )
669        } else {
670            // case 2 : schema length match, but fields mismatch
671            // check if the fields name are the same and have the same data types
672            self.fields()
673                .iter()
674                .zip(other.fields().iter())
675                .try_for_each(|(f1, f2)| {
676                    if f1.name() != f2.name()
677                        || (!DFSchema::datatype_is_semantically_equal(
678                            f1.data_type(),
679                            f2.data_type(),
680                        ))
681                    {
682                        _plan_err!(
683                            "Schema mismatch: Expected field '{}' with type {}, \
684                            but got '{}' with type {}.",
685                            f1.name(),
686                            f1.data_type(),
687                            f2.name(),
688                            f2.data_type()
689                        )
690                    } else {
691                        Ok(())
692                    }
693                })
694        }
695    }
696
697    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
698    /// than datatype_is_semantically_equal in that different representations of same data can be
699    /// logically but not semantically equivalent. Semantically equivalent types are always also
700    /// logically equivalent. For example:
701    /// - a Dictionary<K,V> type is logically equal to a plain V type
702    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
703    /// - Utf8 and Utf8View are logically equal
704    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
705        // check nested fields
706        match (dt1, dt2) {
707            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
708                Self::datatype_is_logically_equal(v1.as_ref(), v2.as_ref())
709            }
710            (DataType::Dictionary(_, v1), othertype)
711            | (othertype, DataType::Dictionary(_, v1)) => {
712                Self::datatype_is_logically_equal(v1.as_ref(), othertype)
713            }
714            (DataType::List(f1), DataType::List(f2))
715            | (DataType::LargeList(f1), DataType::LargeList(f2))
716            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
717                // Don't compare the names of the technical inner field
718                // Usually "item" but that's not mandated
719                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
720            }
721            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
722                // Don't compare the names of the technical inner fields
723                // Usually "entries", "key", "value" but that's not mandated
724                match (f1.data_type(), f2.data_type()) {
725                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
726                        f1_inner.len() == f2_inner.len()
727                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
728                                Self::datatype_is_logically_equal(
729                                    f1.data_type(),
730                                    f2.data_type(),
731                                )
732                            })
733                    }
734                    _ => panic!("Map type should have an inner struct field"),
735                }
736            }
737            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
738                let iter1 = fields1.iter();
739                let iter2 = fields2.iter();
740                fields1.len() == fields2.len() &&
741                        // all fields have to be the same
742                    iter1
743                    .zip(iter2)
744                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
745            }
746            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
747                let iter1 = fields1.iter();
748                let iter2 = fields2.iter();
749                fields1.len() == fields2.len() &&
750                    // all fields have to be the same
751                    iter1
752                        .zip(iter2)
753                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
754            }
755            // Utf8 and Utf8View are logically equivalent
756            (DataType::Utf8, DataType::Utf8View) => true,
757            (DataType::Utf8View, DataType::Utf8) => true,
758            _ => Self::datatype_is_semantically_equal(dt1, dt2),
759        }
760    }
761
762    /// Returns true of two [`DataType`]s are semantically equal (same
763    /// name and type), ignoring both metadata and nullability, decimal precision/scale,
764    /// and timezone time units/timezones.
765    ///
766    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
767    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
768        // check nested fields
769        match (dt1, dt2) {
770            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
771                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
772                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
773            }
774            (DataType::List(f1), DataType::List(f2))
775            | (DataType::LargeList(f1), DataType::LargeList(f2))
776            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
777                // Don't compare the names of the technical inner field
778                // Usually "item" but that's not mandated
779                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
780            }
781            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
782                // Don't compare the names of the technical inner fields
783                // Usually "entries", "key", "value" but that's not mandated
784                match (f1.data_type(), f2.data_type()) {
785                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
786                        f1_inner.len() == f2_inner.len()
787                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
788                                Self::datatype_is_semantically_equal(
789                                    f1.data_type(),
790                                    f2.data_type(),
791                                )
792                            })
793                    }
794                    _ => panic!("Map type should have an inner struct field"),
795                }
796            }
797            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
798                let iter1 = fields1.iter();
799                let iter2 = fields2.iter();
800                fields1.len() == fields2.len() &&
801                        // all fields have to be the same
802                    iter1
803                    .zip(iter2)
804                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
805            }
806            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
807                let iter1 = fields1.iter();
808                let iter2 = fields2.iter();
809                fields1.len() == fields2.len() &&
810                    // all fields have to be the same
811                    iter1
812                        .zip(iter2)
813                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
814            }
815            (
816                DataType::Decimal32(_l_precision, _l_scale),
817                DataType::Decimal32(_r_precision, _r_scale),
818            ) => true,
819            (
820                DataType::Decimal64(_l_precision, _l_scale),
821                DataType::Decimal64(_r_precision, _r_scale),
822            ) => true,
823            (
824                DataType::Decimal128(_l_precision, _l_scale),
825                DataType::Decimal128(_r_precision, _r_scale),
826            ) => true,
827            (
828                DataType::Decimal256(_l_precision, _l_scale),
829                DataType::Decimal256(_r_precision, _r_scale),
830            ) => true,
831            (
832                DataType::Timestamp(_l_time_unit, _l_timezone),
833                DataType::Timestamp(_r_time_unit, _r_timezone),
834            ) => true,
835            _ => dt1 == dt2,
836        }
837    }
838
839    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
840        f1.name() == f2.name()
841            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
842    }
843
844    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
845        f1.name() == f2.name()
846            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
847    }
848
849    /// Strip all field qualifier in schema
850    pub fn strip_qualifiers(self) -> Self {
851        DFSchema {
852            field_qualifiers: vec![None; self.inner.fields.len()],
853            inner: self.inner,
854            functional_dependencies: self.functional_dependencies,
855        }
856    }
857
858    /// Replace all field qualifier with new value in schema
859    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
860        let qualifier = qualifier.into();
861        DFSchema {
862            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
863            inner: self.inner,
864            functional_dependencies: self.functional_dependencies,
865        }
866    }
867
868    /// Get list of fully-qualified field names in this schema
869    pub fn field_names(&self) -> Vec<String> {
870        self.iter()
871            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
872            .collect::<Vec<_>>()
873    }
874
875    /// Get metadata of this schema
876    pub fn metadata(&self) -> &HashMap<String, String> {
877        &self.inner.metadata
878    }
879
880    /// Get functional dependencies
881    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
882        &self.functional_dependencies
883    }
884
885    /// Iterate over the qualifiers and fields in the DFSchema
886    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
887        self.field_qualifiers
888            .iter()
889            .zip(self.inner.fields().iter())
890            .map(|(qualifier, field)| (qualifier.as_ref(), field))
891    }
892    /// Returns a tree-like string representation of the schema.
893    ///
894    /// This method formats the schema
895    /// with a tree-like structure showing field names, types, and nullability.
896    ///
897    /// # Example
898    ///
899    /// ```
900    /// use arrow::datatypes::{DataType, Field, Schema};
901    /// use datafusion_common::DFSchema;
902    /// use std::collections::HashMap;
903    ///
904    /// let schema = DFSchema::from_unqualified_fields(
905    ///     vec![
906    ///         Field::new("id", DataType::Int32, false),
907    ///         Field::new("name", DataType::Utf8, true),
908    ///     ]
909    ///     .into(),
910    ///     HashMap::new(),
911    /// )
912    /// .unwrap();
913    ///
914    /// assert_eq!(
915    ///     schema.tree_string().to_string(),
916    ///     r#"root
917    ///  |-- id: int32 (nullable = false)
918    ///  |-- name: utf8 (nullable = true)"#
919    /// );
920    /// ```
921    pub fn tree_string(&self) -> impl Display + '_ {
922        let mut result = String::from("root\n");
923
924        for (qualifier, field) in self.iter() {
925            let field_name = match qualifier {
926                Some(q) => format!("{}.{}", q, field.name()),
927                None => field.name().to_string(),
928            };
929
930            format_field_with_indent(
931                &mut result,
932                &field_name,
933                field.data_type(),
934                field.is_nullable(),
935                " ",
936            );
937        }
938
939        // Remove the trailing newline
940        if result.ends_with('\n') {
941            result.pop();
942        }
943
944        result
945    }
946}
947
948/// Format field with proper nested indentation for complex types
949fn format_field_with_indent(
950    result: &mut String,
951    field_name: &str,
952    data_type: &DataType,
953    nullable: bool,
954    indent: &str,
955) {
956    let nullable_str = nullable.to_string().to_lowercase();
957    let child_indent = format!("{indent}|    ");
958
959    match data_type {
960        DataType::List(field) => {
961            result.push_str(&format!(
962                "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
963            ));
964            format_field_with_indent(
965                result,
966                field.name(),
967                field.data_type(),
968                field.is_nullable(),
969                &child_indent,
970            );
971        }
972        DataType::LargeList(field) => {
973            result.push_str(&format!(
974                "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
975            ));
976            format_field_with_indent(
977                result,
978                field.name(),
979                field.data_type(),
980                field.is_nullable(),
981                &child_indent,
982            );
983        }
984        DataType::FixedSizeList(field, _size) => {
985            result.push_str(&format!(
986                "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
987            ));
988            format_field_with_indent(
989                result,
990                field.name(),
991                field.data_type(),
992                field.is_nullable(),
993                &child_indent,
994            );
995        }
996        DataType::Map(field, _) => {
997            result.push_str(&format!(
998                "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
999            ));
1000            if let DataType::Struct(inner_fields) = field.data_type()
1001                && inner_fields.len() == 2
1002            {
1003                format_field_with_indent(
1004                    result,
1005                    "key",
1006                    inner_fields[0].data_type(),
1007                    inner_fields[0].is_nullable(),
1008                    &child_indent,
1009                );
1010                let value_contains_null = field.is_nullable().to_string().to_lowercase();
1011                // Handle complex value types properly
1012                match inner_fields[1].data_type() {
1013                    DataType::Struct(_)
1014                    | DataType::List(_)
1015                    | DataType::LargeList(_)
1016                    | DataType::FixedSizeList(_, _)
1017                    | DataType::Map(_, _) => {
1018                        format_field_with_indent(
1019                            result,
1020                            "value",
1021                            inner_fields[1].data_type(),
1022                            inner_fields[1].is_nullable(),
1023                            &child_indent,
1024                        );
1025                    }
1026                    _ => {
1027                        result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1028                                format_simple_data_type(inner_fields[1].data_type())));
1029                    }
1030                }
1031            }
1032        }
1033        DataType::Struct(fields) => {
1034            result.push_str(&format!(
1035                "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1036            ));
1037            for struct_field in fields {
1038                format_field_with_indent(
1039                    result,
1040                    struct_field.name(),
1041                    struct_field.data_type(),
1042                    struct_field.is_nullable(),
1043                    &child_indent,
1044                );
1045            }
1046        }
1047        _ => {
1048            let type_str = format_simple_data_type(data_type);
1049            result.push_str(&format!(
1050                "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1051            ));
1052        }
1053    }
1054}
1055
1056/// Format simple DataType in lowercase format (for leaf nodes)
1057fn format_simple_data_type(data_type: &DataType) -> String {
1058    match data_type {
1059        DataType::Boolean => "boolean".to_string(),
1060        DataType::Int8 => "int8".to_string(),
1061        DataType::Int16 => "int16".to_string(),
1062        DataType::Int32 => "int32".to_string(),
1063        DataType::Int64 => "int64".to_string(),
1064        DataType::UInt8 => "uint8".to_string(),
1065        DataType::UInt16 => "uint16".to_string(),
1066        DataType::UInt32 => "uint32".to_string(),
1067        DataType::UInt64 => "uint64".to_string(),
1068        DataType::Float16 => "float16".to_string(),
1069        DataType::Float32 => "float32".to_string(),
1070        DataType::Float64 => "float64".to_string(),
1071        DataType::Utf8 => "utf8".to_string(),
1072        DataType::LargeUtf8 => "large_utf8".to_string(),
1073        DataType::Binary => "binary".to_string(),
1074        DataType::LargeBinary => "large_binary".to_string(),
1075        DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1076        DataType::Date32 => "date32".to_string(),
1077        DataType::Date64 => "date64".to_string(),
1078        DataType::Time32(_) => "time32".to_string(),
1079        DataType::Time64(_) => "time64".to_string(),
1080        DataType::Timestamp(_, tz) => match tz {
1081            Some(tz_str) => format!("timestamp ({tz_str})"),
1082            None => "timestamp".to_string(),
1083        },
1084        DataType::Interval(_) => "interval".to_string(),
1085        DataType::Dictionary(_, value_type) => {
1086            format_simple_data_type(value_type.as_ref())
1087        }
1088        DataType::Decimal32(precision, scale) => {
1089            format!("decimal32({precision}, {scale})")
1090        }
1091        DataType::Decimal64(precision, scale) => {
1092            format!("decimal64({precision}, {scale})")
1093        }
1094        DataType::Decimal128(precision, scale) => {
1095            format!("decimal128({precision}, {scale})")
1096        }
1097        DataType::Decimal256(precision, scale) => {
1098            format!("decimal256({precision}, {scale})")
1099        }
1100        DataType::Null => "null".to_string(),
1101        _ => format!("{data_type}").to_lowercase(),
1102    }
1103}
1104
1105/// Allow DFSchema to be converted into an Arrow `&Schema`
1106impl AsRef<Schema> for DFSchema {
1107    fn as_ref(&self) -> &Schema {
1108        self.as_arrow()
1109    }
1110}
1111
1112/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
1113/// example)
1114impl AsRef<SchemaRef> for DFSchema {
1115    fn as_ref(&self) -> &SchemaRef {
1116        self.inner()
1117    }
1118}
1119
1120/// Create a `DFSchema` from an Arrow schema
1121impl TryFrom<Schema> for DFSchema {
1122    type Error = DataFusionError;
1123    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1124        Self::try_from(Arc::new(schema))
1125    }
1126}
1127
1128impl TryFrom<SchemaRef> for DFSchema {
1129    type Error = DataFusionError;
1130    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1131        let field_count = schema.fields.len();
1132        let dfschema = Self {
1133            inner: schema,
1134            field_qualifiers: vec![None; field_count],
1135            functional_dependencies: FunctionalDependencies::empty(),
1136        };
1137        // Without checking names, because schema here may have duplicate field names.
1138        // For example, Partial AggregateMode will generate duplicate field names from
1139        // state_fields.
1140        // See <https://github.com/apache/datafusion/issues/17715>
1141        // dfschema.check_names()?;
1142        Ok(dfschema)
1143    }
1144}
1145
1146impl From<DFSchema> for SchemaRef {
1147    fn from(dfschema: DFSchema) -> Self {
1148        Arc::clone(&dfschema.inner)
1149    }
1150}
1151
1152// Hashing refers to a subset of fields considered in PartialEq.
1153impl Hash for DFSchema {
1154    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1155        self.inner.fields.hash(state);
1156        self.inner.metadata.len().hash(state); // HashMap is not hashable
1157    }
1158}
1159
1160/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
1161pub trait ToDFSchema
1162where
1163    Self: Sized,
1164{
1165    /// Attempt to create a DSSchema
1166    fn to_dfschema(self) -> Result<DFSchema>;
1167
1168    /// Attempt to create a DSSchemaRef
1169    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1170        Ok(Arc::new(self.to_dfschema()?))
1171    }
1172}
1173
1174impl ToDFSchema for Schema {
1175    fn to_dfschema(self) -> Result<DFSchema> {
1176        DFSchema::try_from(self)
1177    }
1178}
1179
1180impl ToDFSchema for SchemaRef {
1181    fn to_dfschema(self) -> Result<DFSchema> {
1182        DFSchema::try_from(self)
1183    }
1184}
1185
1186impl ToDFSchema for Vec<Field> {
1187    fn to_dfschema(self) -> Result<DFSchema> {
1188        let field_count = self.len();
1189        let schema = Schema {
1190            fields: self.into(),
1191            metadata: HashMap::new(),
1192        };
1193        let dfschema = DFSchema {
1194            inner: schema.into(),
1195            field_qualifiers: vec![None; field_count],
1196            functional_dependencies: FunctionalDependencies::empty(),
1197        };
1198        Ok(dfschema)
1199    }
1200}
1201
1202impl Display for DFSchema {
1203    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1204        write!(
1205            f,
1206            "fields:[{}], metadata:{:?}",
1207            self.iter()
1208                .map(|(q, f)| qualified_name(q, f.name()))
1209                .collect::<Vec<String>>()
1210                .join(", "),
1211            self.inner.metadata
1212        )
1213    }
1214}
1215
1216/// Provides schema information needed by certain methods of `Expr`
1217/// (defined in the datafusion-common crate).
1218///
1219/// Note that this trait is implemented for &[DFSchema] which is
1220/// widely used in the DataFusion codebase.
1221pub trait ExprSchema: std::fmt::Debug {
1222    /// Is this column reference nullable?
1223    fn nullable(&self, col: &Column) -> Result<bool> {
1224        Ok(self.field_from_column(col)?.is_nullable())
1225    }
1226
1227    /// What is the datatype of this column?
1228    fn data_type(&self, col: &Column) -> Result<&DataType> {
1229        Ok(self.field_from_column(col)?.data_type())
1230    }
1231
1232    /// Returns the column's optional metadata.
1233    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1234        Ok(self.field_from_column(col)?.metadata())
1235    }
1236
1237    /// Return the column's datatype and nullability
1238    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1239        let field = self.field_from_column(col)?;
1240        Ok((field.data_type(), field.is_nullable()))
1241    }
1242
1243    // Return the column's field
1244    fn field_from_column(&self, col: &Column) -> Result<&FieldRef>;
1245}
1246
1247// Implement `ExprSchema` for `Arc<DFSchema>`
1248impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1249    fn nullable(&self, col: &Column) -> Result<bool> {
1250        self.as_ref().nullable(col)
1251    }
1252
1253    fn data_type(&self, col: &Column) -> Result<&DataType> {
1254        self.as_ref().data_type(col)
1255    }
1256
1257    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1258        ExprSchema::metadata(self.as_ref(), col)
1259    }
1260
1261    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1262        self.as_ref().data_type_and_nullable(col)
1263    }
1264
1265    fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1266        self.as_ref().field_from_column(col)
1267    }
1268}
1269
1270impl ExprSchema for DFSchema {
1271    fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1272        match &col.relation {
1273            Some(r) => self.field_with_qualified_name(r, &col.name),
1274            None => self.field_with_unqualified_name(&col.name),
1275        }
1276    }
1277}
1278
1279/// DataFusion-specific extensions to [`Schema`].
1280pub trait SchemaExt {
1281    /// This is a specialized version of Eq that ignores differences
1282    /// in nullability and metadata.
1283    ///
1284    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1285    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1286
1287    /// Returns nothing if the two schemas have the same qualified named
1288    /// fields with logically equivalent data types. Returns internal error otherwise.
1289    ///
1290    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1291    /// equivalence checking.
1292    ///
1293    /// It is only used by insert into cases.
1294    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1295}
1296
1297impl SchemaExt for Schema {
1298    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1299        if self.fields().len() != other.fields().len() {
1300            return false;
1301        }
1302
1303        self.fields()
1304            .iter()
1305            .zip(other.fields().iter())
1306            .all(|(f1, f2)| {
1307                f1.name() == f2.name()
1308                    && DFSchema::datatype_is_semantically_equal(
1309                        f1.data_type(),
1310                        f2.data_type(),
1311                    )
1312            })
1313    }
1314
1315    // It is only used by insert into cases.
1316    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1317        // case 1 : schema length mismatch
1318        if self.fields().len() != other.fields().len() {
1319            _plan_err!(
1320                "Inserting query must have the same schema length as the table. \
1321            Expected table schema length: {}, got: {}",
1322                self.fields().len(),
1323                other.fields().len()
1324            )
1325        } else {
1326            // case 2 : schema length match, but fields mismatch
1327            // check if the fields name are the same and have the same data types
1328            self.fields()
1329                .iter()
1330                .zip(other.fields().iter())
1331                .try_for_each(|(f1, f2)| {
1332                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1333                        _plan_err!(
1334                            "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1335                            but got '{}' with type {}.",
1336                            f1.name(),
1337                            f1.data_type(),
1338                            f2.name(),
1339                            f2.data_type())
1340                    } else {
1341                        Ok(())
1342                    }
1343                })
1344        }
1345    }
1346}
1347
1348/// Build a fully-qualified field name string. This is equivalent to
1349/// `format!("{q}.{name}")` when `qualifier` is `Some`, or just `name` when
1350/// `None`. We avoid going through the `fmt` machinery for performance reasons.
1351pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1352    let qualifier = match qualifier {
1353        None => return name.to_string(),
1354        Some(q) => q,
1355    };
1356    let (first, second, third) = match qualifier {
1357        TableReference::Bare { table } => (table.as_ref(), None, None),
1358        TableReference::Partial { schema, table } => {
1359            (schema.as_ref(), Some(table.as_ref()), None)
1360        }
1361        TableReference::Full {
1362            catalog,
1363            schema,
1364            table,
1365        } => (
1366            catalog.as_ref(),
1367            Some(schema.as_ref()),
1368            Some(table.as_ref()),
1369        ),
1370    };
1371
1372    let extra = second.map_or(0, str::len) + third.map_or(0, str::len);
1373    let mut s = String::with_capacity(first.len() + extra + 3 + name.len());
1374    s.push_str(first);
1375    if let Some(second) = second {
1376        s.push('.');
1377        s.push_str(second);
1378    }
1379    if let Some(third) = third {
1380        s.push('.');
1381        s.push_str(third);
1382    }
1383    s.push('.');
1384    s.push_str(name);
1385    s
1386}
1387
1388#[cfg(test)]
1389mod tests {
1390    use crate::assert_contains;
1391
1392    use super::*;
1393
1394    /// `qualified_name` doesn't use `TableReference::Display` for performance
1395    /// reasons, but check that the output is consistent.
1396    #[test]
1397    fn qualified_name_agrees_with_display() {
1398        let cases: &[(Option<TableReference>, &str)] = &[
1399            (None, "col"),
1400            (Some(TableReference::bare("t")), "c0"),
1401            (Some(TableReference::partial("s", "t")), "c0"),
1402            (Some(TableReference::full("c", "s", "t")), "c0"),
1403            (Some(TableReference::bare("mytable")), "some_column_name"),
1404            // Empty segments must be preserved so that distinct qualified
1405            // fields don't collide in `DFSchema::field_names()`.
1406            (Some(TableReference::bare("")), "col"),
1407            (Some(TableReference::partial("s", "")), "col"),
1408            (Some(TableReference::partial("", "t")), "col"),
1409            (Some(TableReference::full("c", "", "t")), "col"),
1410            (Some(TableReference::full("", "s", "t")), "col"),
1411            (Some(TableReference::full("c", "s", "")), "col"),
1412            (Some(TableReference::full("", "", "")), "col"),
1413        ];
1414        for (qualifier, name) in cases {
1415            let actual = qualified_name(qualifier.as_ref(), name);
1416            let expected = match qualifier {
1417                Some(q) => format!("{q}.{name}"),
1418                None => name.to_string(),
1419            };
1420            assert_eq!(actual, expected, "qualifier={qualifier:?} name={name}");
1421        }
1422    }
1423
1424    #[test]
1425    fn qualifier_in_name() -> Result<()> {
1426        let col = Column::from_name("t1.c0");
1427        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1428        // lookup with unqualified name "t1.c0"
1429        let err = schema.index_of_column(&col).unwrap_err();
1430        let expected = "Schema error: No field named \"t1.c0\". \
1431            Column names are case sensitive. \
1432            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1433            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1434            Did you mean 't1.c0'?.";
1435        assert_eq!(err.strip_backtrace(), expected);
1436        Ok(())
1437    }
1438
1439    #[test]
1440    fn quoted_qualifiers_in_name() -> Result<()> {
1441        let col = Column::from_name("t1.c0");
1442        let schema = DFSchema::try_from_qualified_schema(
1443            "t1",
1444            &Schema::new(vec![
1445                Field::new("CapitalColumn", DataType::Boolean, true),
1446                Field::new("field.with.period", DataType::Boolean, true),
1447            ]),
1448        )?;
1449
1450        // lookup with unqualified name "t1.c0"
1451        let err = schema.index_of_column(&col).unwrap_err();
1452        let expected = "Schema error: No field named \"t1.c0\". \
1453            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1454        assert_eq!(err.strip_backtrace(), expected);
1455        Ok(())
1456    }
1457
1458    #[test]
1459    fn from_unqualified_schema() -> Result<()> {
1460        let schema = DFSchema::try_from(test_schema_1())?;
1461        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1462        Ok(())
1463    }
1464
1465    #[test]
1466    fn from_qualified_schema() -> Result<()> {
1467        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1468        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1469        Ok(())
1470    }
1471
1472    #[test]
1473    fn test_from_field_specific_qualified_schema() -> Result<()> {
1474        let schema = DFSchema::from_field_specific_qualified_schema(
1475            vec![Some("t1".into()), None],
1476            &Arc::new(Schema::new(vec![
1477                Field::new("c0", DataType::Boolean, true),
1478                Field::new("c1", DataType::Boolean, true),
1479            ])),
1480        )?;
1481        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1482        Ok(())
1483    }
1484
1485    #[test]
1486    fn test_from_qualified_fields() -> Result<()> {
1487        let schema = DFSchema::new_with_metadata(
1488            vec![
1489                (
1490                    Some("t0".into()),
1491                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1492                ),
1493                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1494            ],
1495            HashMap::new(),
1496        )?;
1497        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1498        Ok(())
1499    }
1500
1501    #[test]
1502    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1503        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1504        let arrow_schema = schema.as_arrow();
1505        insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1506        Ok(())
1507    }
1508
1509    #[test]
1510    fn join_qualified() -> Result<()> {
1511        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1512        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1513        let join = left.join(&right)?;
1514        assert_eq!(
1515            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1516            join.to_string()
1517        );
1518        // test valid access
1519        assert!(
1520            join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1521                .is_ok()
1522        );
1523        assert!(
1524            join.field_with_qualified_name(&TableReference::bare("t2"), "c0")
1525                .is_ok()
1526        );
1527        // test invalid access
1528        assert!(join.field_with_unqualified_name("c0").is_err());
1529        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1530        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1531        Ok(())
1532    }
1533
1534    #[test]
1535    fn join_qualified_duplicate() -> Result<()> {
1536        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1537        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1538        let join = left.join(&right);
1539        assert_eq!(
1540            join.unwrap_err().strip_backtrace(),
1541            "Schema error: Schema contains duplicate qualified field name t1.c0",
1542        );
1543        Ok(())
1544    }
1545
1546    #[test]
1547    fn join_unqualified_duplicate() -> Result<()> {
1548        let left = DFSchema::try_from(test_schema_1())?;
1549        let right = DFSchema::try_from(test_schema_1())?;
1550        let join = left.join(&right);
1551        assert_eq!(
1552            join.unwrap_err().strip_backtrace(),
1553            "Schema error: Schema contains duplicate unqualified field name c0"
1554        );
1555        Ok(())
1556    }
1557
1558    #[test]
1559    fn join_mixed() -> Result<()> {
1560        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1561        let right = DFSchema::try_from(test_schema_2())?;
1562        let join = left.join(&right)?;
1563        assert_eq!(
1564            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1565            join.to_string()
1566        );
1567        // test valid access
1568        assert!(
1569            join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1570                .is_ok()
1571        );
1572        assert!(join.field_with_unqualified_name("c0").is_ok());
1573        assert!(join.field_with_unqualified_name("c100").is_ok());
1574        assert!(join.field_with_name(None, "c100").is_ok());
1575        // test invalid access
1576        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1577        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1578        assert!(
1579            join.field_with_qualified_name(&TableReference::bare(""), "c100")
1580                .is_err()
1581        );
1582        Ok(())
1583    }
1584
1585    #[test]
1586    fn join_mixed_duplicate() -> Result<()> {
1587        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1588        let right = DFSchema::try_from(test_schema_1())?;
1589        let join = left.join(&right);
1590        assert_contains!(
1591            join.unwrap_err().to_string(),
1592            "Schema error: Schema contains qualified \
1593                          field name t1.c0 and unqualified field name c0 which would be ambiguous"
1594        );
1595        Ok(())
1596    }
1597
1598    #[test]
1599    fn helpful_error_messages() -> Result<()> {
1600        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1601        let expected_help = "Valid fields are t1.c0, t1.c1.";
1602        assert_contains!(
1603            schema
1604                .field_with_qualified_name(&TableReference::bare("x"), "y")
1605                .unwrap_err()
1606                .to_string(),
1607            expected_help
1608        );
1609        assert_contains!(
1610            schema
1611                .field_with_unqualified_name("y")
1612                .unwrap_err()
1613                .to_string(),
1614            expected_help
1615        );
1616        assert!(schema.index_of_column_by_name(None, "y").is_none());
1617        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1618
1619        Ok(())
1620    }
1621
1622    #[test]
1623    fn select_without_valid_fields() {
1624        let schema = DFSchema::empty();
1625
1626        let col = Column::from_qualified_name("t1.c0");
1627        let err = schema.index_of_column(&col).unwrap_err();
1628        let expected = "Schema error: No field named t1.c0.";
1629        assert_eq!(err.strip_backtrace(), expected);
1630
1631        // the same check without qualifier
1632        let col = Column::from_name("c0");
1633        let err = schema.index_of_column(&col).err().unwrap();
1634        let expected = "Schema error: No field named c0.";
1635        assert_eq!(err.strip_backtrace(), expected);
1636    }
1637
1638    #[test]
1639    fn into() {
1640        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1641        let arrow_schema = Schema::new_with_metadata(
1642            vec![Field::new("c0", DataType::Int64, true)],
1643            test_metadata(),
1644        );
1645        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1646
1647        let df_schema = DFSchema {
1648            inner: Arc::clone(&arrow_schema_ref),
1649            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1650            functional_dependencies: FunctionalDependencies::empty(),
1651        };
1652        let df_schema_ref = Arc::new(df_schema.clone());
1653
1654        {
1655            let arrow_schema = arrow_schema.clone();
1656            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1657
1658            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1659            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1660        }
1661
1662        {
1663            let arrow_schema = arrow_schema.clone();
1664            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1665
1666            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1667            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1668        }
1669
1670        // Now, consume the refs
1671        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1672        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1673    }
1674
1675    fn test_schema_1() -> Schema {
1676        Schema::new(vec![
1677            Field::new("c0", DataType::Boolean, true),
1678            Field::new("c1", DataType::Boolean, true),
1679        ])
1680    }
1681    #[test]
1682    fn test_dfschema_to_schema_conversion() {
1683        let mut a_metadata = HashMap::new();
1684        a_metadata.insert("key".to_string(), "value".to_string());
1685        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1686
1687        let mut b_metadata = HashMap::new();
1688        b_metadata.insert("key".to_string(), "value".to_string());
1689        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1690
1691        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1692
1693        let df_schema = DFSchema {
1694            inner: Arc::clone(&schema),
1695            field_qualifiers: vec![None; schema.fields.len()],
1696            functional_dependencies: FunctionalDependencies::empty(),
1697        };
1698
1699        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1700    }
1701
1702    #[test]
1703    fn test_contain_column() -> Result<()> {
1704        // qualified exists
1705        {
1706            let col = Column::from_qualified_name("t1.c0");
1707            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1708            assert!(schema.is_column_from_schema(&col));
1709        }
1710
1711        // qualified not exists
1712        {
1713            let col = Column::from_qualified_name("t1.c2");
1714            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1715            assert!(!schema.is_column_from_schema(&col));
1716        }
1717
1718        // unqualified exists
1719        {
1720            let col = Column::from_name("c0");
1721            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1722            assert!(schema.is_column_from_schema(&col));
1723        }
1724
1725        // unqualified not exists
1726        {
1727            let col = Column::from_name("c2");
1728            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1729            assert!(!schema.is_column_from_schema(&col));
1730        }
1731
1732        Ok(())
1733    }
1734
1735    #[test]
1736    fn test_datatype_is_logically_equal() {
1737        assert!(DFSchema::datatype_is_logically_equal(
1738            &DataType::Int8,
1739            &DataType::Int8
1740        ));
1741
1742        assert!(!DFSchema::datatype_is_logically_equal(
1743            &DataType::Int8,
1744            &DataType::Int16
1745        ));
1746
1747        // Test lists
1748
1749        // Succeeds if both have the same element type, disregards names and nullability
1750        assert!(DFSchema::datatype_is_logically_equal(
1751            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1752            &DataType::List(Field::new("element", DataType::Int8, false).into())
1753        ));
1754
1755        // Fails if element type is different
1756        assert!(!DFSchema::datatype_is_logically_equal(
1757            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1758            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1759        ));
1760
1761        // Test maps
1762        let map_field = DataType::Map(
1763            Field::new(
1764                "entries",
1765                DataType::Struct(Fields::from(vec![
1766                    Field::new("key", DataType::Int8, false),
1767                    Field::new("value", DataType::Int8, true),
1768                ])),
1769                true,
1770            )
1771            .into(),
1772            true,
1773        );
1774
1775        // Succeeds if both maps have the same key and value types, disregards names and nullability
1776        assert!(DFSchema::datatype_is_logically_equal(
1777            &map_field,
1778            &DataType::Map(
1779                Field::new(
1780                    "pairs",
1781                    DataType::Struct(Fields::from(vec![
1782                        Field::new("one", DataType::Int8, false),
1783                        Field::new("two", DataType::Int8, false)
1784                    ])),
1785                    true
1786                )
1787                .into(),
1788                true
1789            )
1790        ));
1791        // Fails if value type is different
1792        assert!(!DFSchema::datatype_is_logically_equal(
1793            &map_field,
1794            &DataType::Map(
1795                Field::new(
1796                    "entries",
1797                    DataType::Struct(Fields::from(vec![
1798                        Field::new("key", DataType::Int8, false),
1799                        Field::new("value", DataType::Int16, true)
1800                    ])),
1801                    true
1802                )
1803                .into(),
1804                true
1805            )
1806        ));
1807
1808        // Fails if key type is different
1809        assert!(!DFSchema::datatype_is_logically_equal(
1810            &map_field,
1811            &DataType::Map(
1812                Field::new(
1813                    "entries",
1814                    DataType::Struct(Fields::from(vec![
1815                        Field::new("key", DataType::Int16, false),
1816                        Field::new("value", DataType::Int8, true)
1817                    ])),
1818                    true
1819                )
1820                .into(),
1821                true
1822            )
1823        ));
1824
1825        // Test structs
1826
1827        let struct_field = DataType::Struct(Fields::from(vec![
1828            Field::new("a", DataType::Int8, true),
1829            Field::new("b", DataType::Int8, true),
1830        ]));
1831
1832        // Succeeds if both have same names and datatypes, ignores nullability
1833        assert!(DFSchema::datatype_is_logically_equal(
1834            &struct_field,
1835            &DataType::Struct(Fields::from(vec![
1836                Field::new("a", DataType::Int8, false),
1837                Field::new("b", DataType::Int8, true),
1838            ]))
1839        ));
1840
1841        // Fails if field names are different
1842        assert!(!DFSchema::datatype_is_logically_equal(
1843            &struct_field,
1844            &DataType::Struct(Fields::from(vec![
1845                Field::new("x", DataType::Int8, true),
1846                Field::new("y", DataType::Int8, true),
1847            ]))
1848        ));
1849
1850        // Fails if types are different
1851        assert!(!DFSchema::datatype_is_logically_equal(
1852            &struct_field,
1853            &DataType::Struct(Fields::from(vec![
1854                Field::new("a", DataType::Int16, true),
1855                Field::new("b", DataType::Int8, true),
1856            ]))
1857        ));
1858
1859        // Fails if more or less fields
1860        assert!(!DFSchema::datatype_is_logically_equal(
1861            &struct_field,
1862            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1863        ));
1864    }
1865
1866    #[test]
1867    fn test_datatype_is_logically_equivalent_to_dictionary() {
1868        // Dictionary is logically equal to its value type
1869        assert!(DFSchema::datatype_is_logically_equal(
1870            &DataType::Utf8,
1871            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1872        ));
1873
1874        // Dictionary is logically equal to the logically equivalent value type
1875        assert!(DFSchema::datatype_is_logically_equal(
1876            &DataType::Utf8View,
1877            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1878        ));
1879
1880        assert!(DFSchema::datatype_is_logically_equal(
1881            &DataType::Dictionary(
1882                Box::new(DataType::Int32),
1883                Box::new(DataType::List(
1884                    Field::new("element", DataType::Utf8, false).into()
1885                ))
1886            ),
1887            &DataType::Dictionary(
1888                Box::new(DataType::Int32),
1889                Box::new(DataType::List(
1890                    Field::new("element", DataType::Utf8View, false).into()
1891                ))
1892            )
1893        ));
1894    }
1895
1896    #[test]
1897    fn test_datatype_is_semantically_equal() {
1898        assert!(DFSchema::datatype_is_semantically_equal(
1899            &DataType::Int8,
1900            &DataType::Int8
1901        ));
1902
1903        assert!(!DFSchema::datatype_is_semantically_equal(
1904            &DataType::Int8,
1905            &DataType::Int16
1906        ));
1907
1908        // Succeeds if decimal precision and scale are different
1909        assert!(DFSchema::datatype_is_semantically_equal(
1910            &DataType::Decimal32(1, 2),
1911            &DataType::Decimal32(2, 1),
1912        ));
1913
1914        assert!(DFSchema::datatype_is_semantically_equal(
1915            &DataType::Decimal64(1, 2),
1916            &DataType::Decimal64(2, 1),
1917        ));
1918
1919        assert!(DFSchema::datatype_is_semantically_equal(
1920            &DataType::Decimal128(1, 2),
1921            &DataType::Decimal128(2, 1),
1922        ));
1923
1924        assert!(DFSchema::datatype_is_semantically_equal(
1925            &DataType::Decimal256(1, 2),
1926            &DataType::Decimal256(2, 1),
1927        ));
1928
1929        // Any two timestamp types should match
1930        assert!(DFSchema::datatype_is_semantically_equal(
1931            &DataType::Timestamp(
1932                arrow::datatypes::TimeUnit::Microsecond,
1933                Some("UTC".into())
1934            ),
1935            &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1936        ));
1937
1938        // Test lists
1939
1940        // Succeeds if both have the same element type, disregards names and nullability
1941        assert!(DFSchema::datatype_is_semantically_equal(
1942            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1943            &DataType::List(Field::new("element", DataType::Int8, false).into())
1944        ));
1945
1946        // Fails if element type is different
1947        assert!(!DFSchema::datatype_is_semantically_equal(
1948            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1949            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1950        ));
1951
1952        // Test maps
1953        let map_field = DataType::Map(
1954            Field::new(
1955                "entries",
1956                DataType::Struct(Fields::from(vec![
1957                    Field::new("key", DataType::Int8, false),
1958                    Field::new("value", DataType::Int8, true),
1959                ])),
1960                true,
1961            )
1962            .into(),
1963            true,
1964        );
1965
1966        // Succeeds if both maps have the same key and value types, disregards names and nullability
1967        assert!(DFSchema::datatype_is_semantically_equal(
1968            &map_field,
1969            &DataType::Map(
1970                Field::new(
1971                    "pairs",
1972                    DataType::Struct(Fields::from(vec![
1973                        Field::new("one", DataType::Int8, false),
1974                        Field::new("two", DataType::Int8, false)
1975                    ])),
1976                    true
1977                )
1978                .into(),
1979                true
1980            )
1981        ));
1982        // Fails if value type is different
1983        assert!(!DFSchema::datatype_is_semantically_equal(
1984            &map_field,
1985            &DataType::Map(
1986                Field::new(
1987                    "entries",
1988                    DataType::Struct(Fields::from(vec![
1989                        Field::new("key", DataType::Int8, false),
1990                        Field::new("value", DataType::Int16, true)
1991                    ])),
1992                    true
1993                )
1994                .into(),
1995                true
1996            )
1997        ));
1998
1999        // Fails if key type is different
2000        assert!(!DFSchema::datatype_is_semantically_equal(
2001            &map_field,
2002            &DataType::Map(
2003                Field::new(
2004                    "entries",
2005                    DataType::Struct(Fields::from(vec![
2006                        Field::new("key", DataType::Int16, false),
2007                        Field::new("value", DataType::Int8, true)
2008                    ])),
2009                    true
2010                )
2011                .into(),
2012                true
2013            )
2014        ));
2015
2016        // Test structs
2017
2018        let struct_field = DataType::Struct(Fields::from(vec![
2019            Field::new("a", DataType::Int8, true),
2020            Field::new("b", DataType::Int8, true),
2021        ]));
2022
2023        // Succeeds if both have same names and datatypes, ignores nullability
2024        assert!(DFSchema::datatype_is_logically_equal(
2025            &struct_field,
2026            &DataType::Struct(Fields::from(vec![
2027                Field::new("a", DataType::Int8, false),
2028                Field::new("b", DataType::Int8, true),
2029            ]))
2030        ));
2031
2032        // Fails if field names are different
2033        assert!(!DFSchema::datatype_is_logically_equal(
2034            &struct_field,
2035            &DataType::Struct(Fields::from(vec![
2036                Field::new("x", DataType::Int8, true),
2037                Field::new("y", DataType::Int8, true),
2038            ]))
2039        ));
2040
2041        // Fails if types are different
2042        assert!(!DFSchema::datatype_is_logically_equal(
2043            &struct_field,
2044            &DataType::Struct(Fields::from(vec![
2045                Field::new("a", DataType::Int16, true),
2046                Field::new("b", DataType::Int8, true),
2047            ]))
2048        ));
2049
2050        // Fails if more or less fields
2051        assert!(!DFSchema::datatype_is_logically_equal(
2052            &struct_field,
2053            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
2054        ));
2055    }
2056
2057    #[test]
2058    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
2059        // Dictionary is not semantically equal to its value type
2060        assert!(!DFSchema::datatype_is_semantically_equal(
2061            &DataType::Utf8,
2062            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
2063        ));
2064    }
2065
2066    fn test_schema_2() -> Schema {
2067        Schema::new(vec![
2068            Field::new("c100", DataType::Boolean, true),
2069            Field::new("c101", DataType::Boolean, true),
2070        ])
2071    }
2072
2073    fn test_metadata() -> HashMap<String, String> {
2074        test_metadata_n(2)
2075    }
2076
2077    fn test_metadata_n(n: usize) -> HashMap<String, String> {
2078        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
2079    }
2080
2081    #[test]
2082    fn test_print_schema_unqualified() {
2083        let schema = DFSchema::from_unqualified_fields(
2084            vec![
2085                Field::new("id", DataType::Int32, false),
2086                Field::new("name", DataType::Utf8, true),
2087                Field::new("age", DataType::Int64, true),
2088                Field::new("active", DataType::Boolean, false),
2089            ]
2090            .into(),
2091            HashMap::new(),
2092        )
2093        .unwrap();
2094
2095        let output = schema.tree_string();
2096
2097        insta::assert_snapshot!(output, @r"
2098        root
2099         |-- id: int32 (nullable = false)
2100         |-- name: utf8 (nullable = true)
2101         |-- age: int64 (nullable = true)
2102         |-- active: boolean (nullable = false)
2103        ");
2104    }
2105
2106    #[test]
2107    fn test_print_schema_qualified() {
2108        let schema = DFSchema::try_from_qualified_schema(
2109            "table1",
2110            &Schema::new(vec![
2111                Field::new("id", DataType::Int32, false),
2112                Field::new("name", DataType::Utf8, true),
2113            ]),
2114        )
2115        .unwrap();
2116
2117        let output = schema.tree_string();
2118
2119        insta::assert_snapshot!(output, @r"
2120        root
2121         |-- table1.id: int32 (nullable = false)
2122         |-- table1.name: utf8 (nullable = true)
2123        ");
2124    }
2125
2126    #[test]
2127    fn test_print_schema_complex_types() {
2128        let struct_field = Field::new(
2129            "address",
2130            DataType::Struct(Fields::from(vec![
2131                Field::new("street", DataType::Utf8, true),
2132                Field::new("city", DataType::Utf8, true),
2133            ])),
2134            true,
2135        );
2136
2137        let list_field = Field::new(
2138            "tags",
2139            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2140            true,
2141        );
2142
2143        let schema = DFSchema::from_unqualified_fields(
2144            vec![
2145                Field::new("id", DataType::Int32, false),
2146                struct_field,
2147                list_field,
2148                Field::new("score", DataType::Decimal128(10, 2), true),
2149            ]
2150            .into(),
2151            HashMap::new(),
2152        )
2153        .unwrap();
2154
2155        let output = schema.tree_string();
2156        insta::assert_snapshot!(output, @r"
2157        root
2158         |-- id: int32 (nullable = false)
2159         |-- address: struct (nullable = true)
2160         |    |-- street: utf8 (nullable = true)
2161         |    |-- city: utf8 (nullable = true)
2162         |-- tags: list (nullable = true)
2163         |    |-- item: utf8 (nullable = true)
2164         |-- score: decimal128(10, 2) (nullable = true)
2165        ");
2166    }
2167
2168    #[test]
2169    fn test_print_schema_empty() {
2170        let schema = DFSchema::empty();
2171        let output = schema.tree_string();
2172        insta::assert_snapshot!(output, @"root");
2173    }
2174
2175    #[test]
2176    fn test_print_schema_deeply_nested_types() {
2177        // Create a deeply nested structure to test indentation and complex type formatting
2178        let inner_struct = Field::new(
2179            "inner",
2180            DataType::Struct(Fields::from(vec![
2181                Field::new("level1", DataType::Utf8, true),
2182                Field::new("level2", DataType::Int32, false),
2183            ])),
2184            true,
2185        );
2186
2187        let nested_list = Field::new(
2188            "nested_list",
2189            DataType::List(Arc::new(Field::new(
2190                "item",
2191                DataType::Struct(Fields::from(vec![
2192                    Field::new("id", DataType::Int64, false),
2193                    Field::new("value", DataType::Float64, true),
2194                ])),
2195                true,
2196            ))),
2197            true,
2198        );
2199
2200        let map_field = Field::new(
2201            "map_data",
2202            DataType::Map(
2203                Arc::new(Field::new(
2204                    "entries",
2205                    DataType::Struct(Fields::from(vec![
2206                        Field::new("key", DataType::Utf8, false),
2207                        Field::new(
2208                            "value",
2209                            DataType::List(Arc::new(Field::new(
2210                                "item",
2211                                DataType::Int32,
2212                                true,
2213                            ))),
2214                            true,
2215                        ),
2216                    ])),
2217                    false,
2218                )),
2219                false,
2220            ),
2221            true,
2222        );
2223
2224        let schema = DFSchema::from_unqualified_fields(
2225            vec![
2226                Field::new("simple_field", DataType::Utf8, true),
2227                inner_struct,
2228                nested_list,
2229                map_field,
2230                Field::new(
2231                    "timestamp_field",
2232                    DataType::Timestamp(
2233                        arrow::datatypes::TimeUnit::Microsecond,
2234                        Some("UTC".into()),
2235                    ),
2236                    false,
2237                ),
2238            ]
2239            .into(),
2240            HashMap::new(),
2241        )
2242        .unwrap();
2243
2244        let output = schema.tree_string();
2245
2246        insta::assert_snapshot!(output, @r"
2247        root
2248         |-- simple_field: utf8 (nullable = true)
2249         |-- inner: struct (nullable = true)
2250         |    |-- level1: utf8 (nullable = true)
2251         |    |-- level2: int32 (nullable = false)
2252         |-- nested_list: list (nullable = true)
2253         |    |-- item: struct (nullable = true)
2254         |    |    |-- id: int64 (nullable = false)
2255         |    |    |-- value: float64 (nullable = true)
2256         |-- map_data: map (nullable = true)
2257         |    |-- key: utf8 (nullable = false)
2258         |    |-- value: list (nullable = true)
2259         |    |    |-- item: int32 (nullable = true)
2260         |-- timestamp_field: timestamp (UTC) (nullable = false)
2261        ");
2262    }
2263
2264    #[test]
2265    fn test_print_schema_mixed_qualified_unqualified() {
2266        // Test a schema with mixed qualified and unqualified fields
2267        let schema = DFSchema::new_with_metadata(
2268            vec![
2269                (
2270                    Some("table1".into()),
2271                    Arc::new(Field::new("id", DataType::Int32, false)),
2272                ),
2273                (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2274                (
2275                    Some("table2".into()),
2276                    Arc::new(Field::new("score", DataType::Float64, true)),
2277                ),
2278                (
2279                    None,
2280                    Arc::new(Field::new("active", DataType::Boolean, false)),
2281                ),
2282            ],
2283            HashMap::new(),
2284        )
2285        .unwrap();
2286
2287        let output = schema.tree_string();
2288
2289        insta::assert_snapshot!(output, @r"
2290        root
2291         |-- table1.id: int32 (nullable = false)
2292         |-- name: utf8 (nullable = true)
2293         |-- table2.score: float64 (nullable = true)
2294         |-- active: boolean (nullable = false)
2295        ");
2296    }
2297
2298    #[test]
2299    fn test_print_schema_array_of_map() {
2300        // Test the specific example from user feedback: array of map
2301        let map_field = Field::new(
2302            "entries",
2303            DataType::Struct(Fields::from(vec![
2304                Field::new("key", DataType::Utf8, false),
2305                Field::new("value", DataType::Utf8, false),
2306            ])),
2307            false,
2308        );
2309
2310        let array_of_map_field = Field::new(
2311            "array_map_field",
2312            DataType::List(Arc::new(Field::new(
2313                "item",
2314                DataType::Map(Arc::new(map_field), false),
2315                false,
2316            ))),
2317            false,
2318        );
2319
2320        let schema = DFSchema::from_unqualified_fields(
2321            vec![array_of_map_field].into(),
2322            HashMap::new(),
2323        )
2324        .unwrap();
2325
2326        let output = schema.tree_string();
2327
2328        insta::assert_snapshot!(output, @r"
2329        root
2330         |-- array_map_field: list (nullable = false)
2331         |    |-- item: map (nullable = false)
2332         |    |    |-- key: utf8 (nullable = false)
2333         |    |    |-- value: utf8 (nullable = false)
2334        ");
2335    }
2336
2337    #[test]
2338    fn test_print_schema_complex_type_combinations() {
2339        // Test various combinations of list, struct, and map types
2340
2341        // List of structs
2342        let list_of_structs = Field::new(
2343            "list_of_structs",
2344            DataType::List(Arc::new(Field::new(
2345                "item",
2346                DataType::Struct(Fields::from(vec![
2347                    Field::new("id", DataType::Int32, false),
2348                    Field::new("name", DataType::Utf8, true),
2349                    Field::new("score", DataType::Float64, true),
2350                ])),
2351                true,
2352            ))),
2353            true,
2354        );
2355
2356        // Struct containing lists
2357        let struct_with_lists = Field::new(
2358            "struct_with_lists",
2359            DataType::Struct(Fields::from(vec![
2360                Field::new(
2361                    "tags",
2362                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2363                    true,
2364                ),
2365                Field::new(
2366                    "scores",
2367                    DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2368                    false,
2369                ),
2370                Field::new("metadata", DataType::Utf8, true),
2371            ])),
2372            false,
2373        );
2374
2375        // Map with struct values
2376        let map_with_struct_values = Field::new(
2377            "map_with_struct_values",
2378            DataType::Map(
2379                Arc::new(Field::new(
2380                    "entries",
2381                    DataType::Struct(Fields::from(vec![
2382                        Field::new("key", DataType::Utf8, false),
2383                        Field::new(
2384                            "value",
2385                            DataType::Struct(Fields::from(vec![
2386                                Field::new("count", DataType::Int64, false),
2387                                Field::new("active", DataType::Boolean, true),
2388                            ])),
2389                            true,
2390                        ),
2391                    ])),
2392                    false,
2393                )),
2394                false,
2395            ),
2396            true,
2397        );
2398
2399        // List of maps
2400        let list_of_maps = Field::new(
2401            "list_of_maps",
2402            DataType::List(Arc::new(Field::new(
2403                "item",
2404                DataType::Map(
2405                    Arc::new(Field::new(
2406                        "entries",
2407                        DataType::Struct(Fields::from(vec![
2408                            Field::new("key", DataType::Utf8, false),
2409                            Field::new("value", DataType::Int32, true),
2410                        ])),
2411                        false,
2412                    )),
2413                    false,
2414                ),
2415                true,
2416            ))),
2417            true,
2418        );
2419
2420        // Deeply nested: struct containing list of structs containing maps
2421        let deeply_nested = Field::new(
2422            "deeply_nested",
2423            DataType::Struct(Fields::from(vec![
2424                Field::new("level1", DataType::Utf8, true),
2425                Field::new(
2426                    "level2",
2427                    DataType::List(Arc::new(Field::new(
2428                        "item",
2429                        DataType::Struct(Fields::from(vec![
2430                            Field::new("id", DataType::Int32, false),
2431                            Field::new(
2432                                "properties",
2433                                DataType::Map(
2434                                    Arc::new(Field::new(
2435                                        "entries",
2436                                        DataType::Struct(Fields::from(vec![
2437                                            Field::new("key", DataType::Utf8, false),
2438                                            Field::new("value", DataType::Float64, true),
2439                                        ])),
2440                                        false,
2441                                    )),
2442                                    false,
2443                                ),
2444                                true,
2445                            ),
2446                        ])),
2447                        true,
2448                    ))),
2449                    false,
2450                ),
2451            ])),
2452            true,
2453        );
2454
2455        let schema = DFSchema::from_unqualified_fields(
2456            vec![
2457                list_of_structs,
2458                struct_with_lists,
2459                map_with_struct_values,
2460                list_of_maps,
2461                deeply_nested,
2462            ]
2463            .into(),
2464            HashMap::new(),
2465        )
2466        .unwrap();
2467
2468        let output = schema.tree_string();
2469
2470        insta::assert_snapshot!(output, @r"
2471        root
2472         |-- list_of_structs: list (nullable = true)
2473         |    |-- item: struct (nullable = true)
2474         |    |    |-- id: int32 (nullable = false)
2475         |    |    |-- name: utf8 (nullable = true)
2476         |    |    |-- score: float64 (nullable = true)
2477         |-- struct_with_lists: struct (nullable = false)
2478         |    |-- tags: list (nullable = true)
2479         |    |    |-- item: utf8 (nullable = true)
2480         |    |-- scores: list (nullable = false)
2481         |    |    |-- item: int32 (nullable = true)
2482         |    |-- metadata: utf8 (nullable = true)
2483         |-- map_with_struct_values: map (nullable = true)
2484         |    |-- key: utf8 (nullable = false)
2485         |    |-- value: struct (nullable = true)
2486         |    |    |-- count: int64 (nullable = false)
2487         |    |    |-- active: boolean (nullable = true)
2488         |-- list_of_maps: list (nullable = true)
2489         |    |-- item: map (nullable = true)
2490         |    |    |-- key: utf8 (nullable = false)
2491         |    |    |-- value: int32 (nullable = false)
2492         |-- deeply_nested: struct (nullable = true)
2493         |    |-- level1: utf8 (nullable = true)
2494         |    |-- level2: list (nullable = false)
2495         |    |    |-- item: struct (nullable = true)
2496         |    |    |    |-- id: int32 (nullable = false)
2497         |    |    |    |-- properties: map (nullable = true)
2498         |    |    |    |    |-- key: utf8 (nullable = false)
2499         |    |    |    |    |-- value: float64 (nullable = false)
2500        ");
2501    }
2502
2503    #[test]
2504    fn test_print_schema_edge_case_types() {
2505        // Test edge cases and special types
2506        let schema = DFSchema::from_unqualified_fields(
2507            vec![
2508                Field::new("null_field", DataType::Null, true),
2509                Field::new("binary_field", DataType::Binary, false),
2510                Field::new("large_binary", DataType::LargeBinary, true),
2511                Field::new("large_utf8", DataType::LargeUtf8, false),
2512                Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2513                Field::new(
2514                    "fixed_size_list",
2515                    DataType::FixedSizeList(
2516                        Arc::new(Field::new("item", DataType::Int32, true)),
2517                        5,
2518                    ),
2519                    false,
2520                ),
2521                Field::new("decimal32", DataType::Decimal32(9, 4), true),
2522                Field::new("decimal64", DataType::Decimal64(9, 4), true),
2523                Field::new("decimal128", DataType::Decimal128(18, 4), true),
2524                Field::new("decimal256", DataType::Decimal256(38, 10), false),
2525                Field::new("date32", DataType::Date32, true),
2526                Field::new("date64", DataType::Date64, false),
2527                Field::new(
2528                    "time32_seconds",
2529                    DataType::Time32(arrow::datatypes::TimeUnit::Second),
2530                    true,
2531                ),
2532                Field::new(
2533                    "time64_nanoseconds",
2534                    DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2535                    false,
2536                ),
2537            ]
2538            .into(),
2539            HashMap::new(),
2540        )
2541        .unwrap();
2542
2543        let output = schema.tree_string();
2544
2545        insta::assert_snapshot!(output, @r"
2546        root
2547         |-- null_field: null (nullable = true)
2548         |-- binary_field: binary (nullable = false)
2549         |-- large_binary: large_binary (nullable = true)
2550         |-- large_utf8: large_utf8 (nullable = false)
2551         |-- fixed_size_binary: fixed_size_binary (nullable = true)
2552         |-- fixed_size_list: fixed size list (nullable = false)
2553         |    |-- item: int32 (nullable = true)
2554         |-- decimal32: decimal32(9, 4) (nullable = true)
2555         |-- decimal64: decimal64(9, 4) (nullable = true)
2556         |-- decimal128: decimal128(18, 4) (nullable = true)
2557         |-- decimal256: decimal256(38, 10) (nullable = false)
2558         |-- date32: date32 (nullable = true)
2559         |-- date64: date64 (nullable = false)
2560         |-- time32_seconds: time32 (nullable = true)
2561         |-- time64_nanoseconds: time64 (nullable = false)
2562        ");
2563    }
2564}