datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use datafusion_common::{DFSchema, Column};
60/// use arrow::datatypes::{DataType, Field, Schema};
61///
62/// let arrow_schema = Schema::new(vec![
63///    Field::new("c1", DataType::Int32, false),
64/// ]);
65///
66/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
67/// let column = Column::from_qualified_name("t1.c1");
68/// assert!(df_schema.has_column(&column));
69///
70/// // Can also access qualified fields with unqualified name, if it's unambiguous
71/// let column = Column::from_qualified_name("c1");
72/// assert!(df_schema.has_column(&column));
73/// ```
74///
75/// # Creating unqualified schemas
76///
77/// Create an unqualified schema using TryFrom:
78///
79/// ```rust
80/// use datafusion_common::{DFSchema, Column};
81/// use arrow::datatypes::{DataType, Field, Schema};
82///
83/// let arrow_schema = Schema::new(vec![
84///    Field::new("c1", DataType::Int32, false),
85/// ]);
86///
87/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
88/// let column = Column::new_unqualified("c1");
89/// assert!(df_schema.has_column(&column));
90/// ```
91///
92/// # Converting back to Arrow schema
93///
94/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
95///
96/// ```rust
97/// use datafusion_common::DFSchema;
98/// use arrow::datatypes::{Schema, Field};
99/// use std::collections::HashMap;
100///
101/// let df_schema = DFSchema::from_unqualified_fields(vec![
102///    Field::new("c1", arrow::datatypes::DataType::Int32, false),
103/// ].into(),HashMap::new()).unwrap();
104/// let schema = Schema::from(df_schema);
105/// assert_eq!(schema.fields().len(), 1);
106/// ```
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DFSchema {
109    /// Inner Arrow schema reference.
110    inner: SchemaRef,
111    /// Optional qualifiers for each column in this schema. In the same order as
112    /// the `self.inner.fields()`
113    field_qualifiers: Vec<Option<TableReference>>,
114    /// Stores functional dependencies in the schema.
115    functional_dependencies: FunctionalDependencies,
116}
117
118impl DFSchema {
119    /// Creates an empty `DFSchema`
120    pub fn empty() -> Self {
121        Self {
122            inner: Arc::new(Schema::new([])),
123            field_qualifiers: vec![],
124            functional_dependencies: FunctionalDependencies::empty(),
125        }
126    }
127
128    /// Return a reference to the inner Arrow [`Schema`]
129    ///
130    /// Note this does not have the qualifier information
131    pub fn as_arrow(&self) -> &Schema {
132        self.inner.as_ref()
133    }
134
135    /// Return a reference to the inner Arrow [`SchemaRef`]
136    ///
137    /// Note this does not have the qualifier information
138    pub fn inner(&self) -> &SchemaRef {
139        &self.inner
140    }
141
142    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
143    pub fn new_with_metadata(
144        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145        metadata: HashMap<String, String>,
146    ) -> Result<Self> {
147        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
148            qualified_fields.into_iter().unzip();
149
150        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
151
152        let dfschema = Self {
153            inner: schema,
154            field_qualifiers: qualifiers,
155            functional_dependencies: FunctionalDependencies::empty(),
156        };
157        dfschema.check_names()?;
158        Ok(dfschema)
159    }
160
161    /// Create a new `DFSchema` from a list of Arrow [Field]s
162    pub fn from_unqualified_fields(
163        fields: Fields,
164        metadata: HashMap<String, String>,
165    ) -> Result<Self> {
166        let field_count = fields.len();
167        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
168        let dfschema = Self {
169            inner: schema,
170            field_qualifiers: vec![None; field_count],
171            functional_dependencies: FunctionalDependencies::empty(),
172        };
173        dfschema.check_names()?;
174        Ok(dfschema)
175    }
176
177    /// Create a `DFSchema` from an Arrow schema and a given qualifier
178    ///
179    /// To create a schema from an Arrow schema without a qualifier, use
180    /// `DFSchema::try_from`.
181    pub fn try_from_qualified_schema(
182        qualifier: impl Into<TableReference>,
183        schema: &Schema,
184    ) -> Result<Self> {
185        let qualifier = qualifier.into();
186        let schema = DFSchema {
187            inner: schema.clone().into(),
188            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
189            functional_dependencies: FunctionalDependencies::empty(),
190        };
191        schema.check_names()?;
192        Ok(schema)
193    }
194
195    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
196    pub fn from_field_specific_qualified_schema(
197        qualifiers: Vec<Option<TableReference>>,
198        schema: &SchemaRef,
199    ) -> Result<Self> {
200        let dfschema = Self {
201            inner: Arc::clone(schema),
202            field_qualifiers: qualifiers,
203            functional_dependencies: FunctionalDependencies::empty(),
204        };
205        dfschema.check_names()?;
206        Ok(dfschema)
207    }
208
209    /// Check if the schema have some fields with the same name
210    pub fn check_names(&self) -> Result<()> {
211        let mut qualified_names = BTreeSet::new();
212        let mut unqualified_names = BTreeSet::new();
213
214        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
215            if let Some(qualifier) = qualifier {
216                if !qualified_names.insert((qualifier, field.name())) {
217                    return _schema_err!(SchemaError::DuplicateQualifiedField {
218                        qualifier: Box::new(qualifier.clone()),
219                        name: field.name().to_string(),
220                    });
221                }
222            } else if !unqualified_names.insert(field.name()) {
223                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
224                    name: field.name().to_string()
225                });
226            }
227        }
228
229        for (qualifier, name) in qualified_names {
230            if unqualified_names.contains(name) {
231                return _schema_err!(SchemaError::AmbiguousReference {
232                    field: Column::new(Some(qualifier.clone()), name)
233                });
234            }
235        }
236        Ok(())
237    }
238
239    /// Assigns functional dependencies.
240    pub fn with_functional_dependencies(
241        mut self,
242        functional_dependencies: FunctionalDependencies,
243    ) -> Result<Self> {
244        if functional_dependencies.is_valid(self.inner.fields.len()) {
245            self.functional_dependencies = functional_dependencies;
246            Ok(self)
247        } else {
248            _plan_err!(
249                "Invalid functional dependency: {:?}",
250                functional_dependencies
251            )
252        }
253    }
254
255    /// Create a new schema that contains the fields from this schema followed by the fields
256    /// from the supplied schema. An error will be returned if there are duplicate field names.
257    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
258        let mut schema_builder = SchemaBuilder::new();
259        schema_builder.extend(self.inner.fields().iter().cloned());
260        schema_builder.extend(schema.fields().iter().cloned());
261        let new_schema = schema_builder.finish();
262
263        let mut new_metadata = self.inner.metadata.clone();
264        new_metadata.extend(schema.inner.metadata.clone());
265        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
266
267        let mut new_qualifiers = self.field_qualifiers.clone();
268        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
269
270        let new_self = Self {
271            inner: Arc::new(new_schema_with_metadata),
272            field_qualifiers: new_qualifiers,
273            functional_dependencies: FunctionalDependencies::empty(),
274        };
275        new_self.check_names()?;
276        Ok(new_self)
277    }
278
279    /// Modify this schema by appending the fields from the supplied schema, ignoring any
280    /// duplicate fields.
281    pub fn merge(&mut self, other_schema: &DFSchema) {
282        if other_schema.inner.fields.is_empty() {
283            return;
284        }
285
286        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
287            self.iter().collect();
288        let self_unqualified_names: HashSet<&str> = self
289            .inner
290            .fields
291            .iter()
292            .map(|field| field.name().as_str())
293            .collect();
294
295        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
296        let mut qualifiers = Vec::new();
297        for (qualifier, field) in other_schema.iter() {
298            // skip duplicate columns
299            let duplicated_field = match qualifier {
300                Some(q) => self_fields.contains(&(Some(q), field)),
301                // for unqualified columns, check as unqualified name
302                None => self_unqualified_names.contains(field.name().as_str()),
303            };
304            if !duplicated_field {
305                schema_builder.push(Arc::clone(field));
306                qualifiers.push(qualifier.cloned());
307            }
308        }
309        let mut metadata = self.inner.metadata.clone();
310        metadata.extend(other_schema.inner.metadata.clone());
311
312        let finished = schema_builder.finish();
313        let finished_with_metadata = finished.with_metadata(metadata);
314        self.inner = finished_with_metadata.into();
315        self.field_qualifiers.extend(qualifiers);
316    }
317
318    /// Get a list of fields
319    pub fn fields(&self) -> &Fields {
320        &self.inner.fields
321    }
322
323    /// Returns an immutable reference of a specific `Field` instance selected using an
324    /// offset within the internal `fields` vector
325    pub fn field(&self, i: usize) -> &Field {
326        &self.inner.fields[i]
327    }
328
329    /// Returns an immutable reference of a specific `Field` instance selected using an
330    /// offset within the internal `fields` vector and its qualifier
331    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
332        (self.field_qualifiers[i].as_ref(), self.field(i))
333    }
334
335    pub fn index_of_column_by_name(
336        &self,
337        qualifier: Option<&TableReference>,
338        name: &str,
339    ) -> Option<usize> {
340        let mut matches = self
341            .iter()
342            .enumerate()
343            .filter(|(_, (q, f))| match (qualifier, q) {
344                // field to lookup is qualified.
345                // current field is qualified and not shared between relations, compare both
346                // qualifier and name.
347                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
348                // field to lookup is qualified but current field is unqualified.
349                (Some(_), None) => false,
350                // field to lookup is unqualified, no need to compare qualifier
351                (None, Some(_)) | (None, None) => f.name() == name,
352            })
353            .map(|(idx, _)| idx);
354        matches.next()
355    }
356
357    /// Find the index of the column with the given qualifier and name,
358    /// returning `None` if not found
359    ///
360    /// See [Self::index_of_column] for a version that returns an error if the
361    /// column is not found
362    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
363        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
364    }
365
366    /// Find the index of the column with the given qualifier and name,
367    /// returning `Err` if not found
368    ///
369    /// See [Self::maybe_index_of_column] for a version that returns `None` if
370    /// the column is not found
371    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
372        self.maybe_index_of_column(col)
373            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
374    }
375
376    /// Check if the column is in the current schema
377    pub fn is_column_from_schema(&self, col: &Column) -> bool {
378        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
379            .is_some()
380    }
381
382    /// Find the field with the given name
383    pub fn field_with_name(
384        &self,
385        qualifier: Option<&TableReference>,
386        name: &str,
387    ) -> Result<&Field> {
388        if let Some(qualifier) = qualifier {
389            self.field_with_qualified_name(qualifier, name)
390        } else {
391            self.field_with_unqualified_name(name)
392        }
393    }
394
395    /// Find the qualified field with the given name
396    pub fn qualified_field_with_name(
397        &self,
398        qualifier: Option<&TableReference>,
399        name: &str,
400    ) -> Result<(Option<&TableReference>, &Field)> {
401        if let Some(qualifier) = qualifier {
402            let idx = self
403                .index_of_column_by_name(Some(qualifier), name)
404                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
405            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
406        } else {
407            self.qualified_field_with_unqualified_name(name)
408        }
409    }
410
411    /// Find all fields having the given qualifier
412    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
413        self.iter()
414            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
415            .map(|(_, f)| f.as_ref())
416            .collect()
417    }
418
419    /// Find all fields indices having the given qualifier
420    pub fn fields_indices_with_qualified(
421        &self,
422        qualifier: &TableReference,
423    ) -> Vec<usize> {
424        self.iter()
425            .enumerate()
426            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
427            .collect()
428    }
429
430    /// Find all fields that match the given name
431    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
432        self.fields()
433            .iter()
434            .filter(|field| field.name() == name)
435            .map(|f| f.as_ref())
436            .collect()
437    }
438
439    /// Find all fields that match the given name and return them with their qualifier
440    pub fn qualified_fields_with_unqualified_name(
441        &self,
442        name: &str,
443    ) -> Vec<(Option<&TableReference>, &Field)> {
444        self.iter()
445            .filter(|(_, field)| field.name() == name)
446            .map(|(qualifier, field)| (qualifier, field.as_ref()))
447            .collect()
448    }
449
450    /// Find all fields that match the given name and convert to column
451    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
452        self.iter()
453            .filter(|(_, field)| field.name() == name)
454            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
455            .collect()
456    }
457
458    /// Return all `Column`s for the schema
459    pub fn columns(&self) -> Vec<Column> {
460        self.iter()
461            .map(|(qualifier, field)| {
462                Column::new(qualifier.cloned(), field.name().clone())
463            })
464            .collect()
465    }
466
467    /// Find the qualified field with the given unqualified name
468    pub fn qualified_field_with_unqualified_name(
469        &self,
470        name: &str,
471    ) -> Result<(Option<&TableReference>, &Field)> {
472        let matches = self.qualified_fields_with_unqualified_name(name);
473        match matches.len() {
474            0 => Err(unqualified_field_not_found(name, self)),
475            1 => Ok((matches[0].0, (matches[0].1))),
476            _ => {
477                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
478                // Because name may generate from Alias/... . It means that it don't own qualifier.
479                // For example:
480                //             Join on id = b.id
481                // Project a.id as id   TableScan b id
482                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
483                // one field without qualifier, we should return it.
484                let fields_without_qualifier = matches
485                    .iter()
486                    .filter(|(q, _)| q.is_none())
487                    .collect::<Vec<_>>();
488                if fields_without_qualifier.len() == 1 {
489                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
490                } else {
491                    _schema_err!(SchemaError::AmbiguousReference {
492                        field: Column::new_unqualified(name.to_string(),),
493                    })
494                }
495            }
496        }
497    }
498
499    /// Find the field with the given name
500    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
501        self.qualified_field_with_unqualified_name(name)
502            .map(|(_, field)| field)
503    }
504
505    /// Find the field with the given qualified name
506    pub fn field_with_qualified_name(
507        &self,
508        qualifier: &TableReference,
509        name: &str,
510    ) -> Result<&Field> {
511        let idx = self
512            .index_of_column_by_name(Some(qualifier), name)
513            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
514
515        Ok(self.field(idx))
516    }
517
518    /// Find the field with the given qualified column
519    pub fn field_from_column(&self, column: &Column) -> Result<&Field> {
520        match &column.relation {
521            Some(r) => self.field_with_qualified_name(r, &column.name),
522            None => self.field_with_unqualified_name(&column.name),
523        }
524    }
525
526    /// Find the field with the given qualified column
527    pub fn qualified_field_from_column(
528        &self,
529        column: &Column,
530    ) -> Result<(Option<&TableReference>, &Field)> {
531        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
532    }
533
534    /// Find if the field exists with the given name
535    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
536        self.fields().iter().any(|field| field.name() == name)
537    }
538
539    /// Find if the field exists with the given qualified name
540    pub fn has_column_with_qualified_name(
541        &self,
542        qualifier: &TableReference,
543        name: &str,
544    ) -> bool {
545        self.iter()
546            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
547    }
548
549    /// Find if the field exists with the given qualified column
550    pub fn has_column(&self, column: &Column) -> bool {
551        match &column.relation {
552            Some(r) => self.has_column_with_qualified_name(r, &column.name),
553            None => self.has_column_with_unqualified_name(&column.name),
554        }
555    }
556
557    /// Check to see if unqualified field names matches field names in Arrow schema
558    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
559        self.inner
560            .fields
561            .iter()
562            .zip(arrow_schema.fields().iter())
563            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
564    }
565
566    /// Check to see if fields in 2 Arrow schemas are compatible
567    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
568    pub fn check_arrow_schema_type_compatible(
569        &self,
570        arrow_schema: &Schema,
571    ) -> Result<()> {
572        let self_arrow_schema: Schema = self.into();
573        self_arrow_schema
574            .fields()
575            .iter()
576            .zip(arrow_schema.fields().iter())
577            .try_for_each(|(l_field, r_field)| {
578                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
579                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
580                                r_field.name(),
581                                r_field.data_type(),
582                                l_field.name(),
583                                l_field.data_type())
584                } else {
585                    Ok(())
586                }
587            })
588    }
589
590    /// Returns true if the two schemas have the same qualified named
591    /// fields with logically equivalent data types. Returns false otherwise.
592    ///
593    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
594    /// equivalence checking.
595    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
596        if self.fields().len() != other.fields().len() {
597            return false;
598        }
599        let self_fields = self.iter();
600        let other_fields = other.iter();
601        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
602            q1 == q2
603                && f1.name() == f2.name()
604                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
605        })
606    }
607
608    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
609    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
610        self.has_equivalent_names_and_types(other).is_ok()
611    }
612
613    /// Returns Ok if the two schemas have the same qualified named
614    /// fields with the compatible data types.
615    ///
616    /// Returns an `Err` with a message otherwise.
617    ///
618    /// This is a specialized version of Eq that ignores differences in
619    /// nullability and metadata.
620    ///
621    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
622    /// logical type checking, which for example would consider a dictionary
623    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
624    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
625        // case 1 : schema length mismatch
626        if self.fields().len() != other.fields().len() {
627            _plan_err!(
628                "Schema mismatch: the schema length are not same \
629            Expected schema length: {}, got: {}",
630                self.fields().len(),
631                other.fields().len()
632            )
633        } else {
634            // case 2 : schema length match, but fields mismatch
635            // check if the fields name are the same and have the same data types
636            self.fields()
637                .iter()
638                .zip(other.fields().iter())
639                .try_for_each(|(f1, f2)| {
640                    if f1.name() != f2.name()
641                        || (!DFSchema::datatype_is_semantically_equal(
642                            f1.data_type(),
643                            f2.data_type(),
644                        ))
645                    {
646                        _plan_err!(
647                            "Schema mismatch: Expected field '{}' with type {:?}, \
648                            but got '{}' with type {:?}.",
649                            f1.name(),
650                            f1.data_type(),
651                            f2.name(),
652                            f2.data_type()
653                        )
654                    } else {
655                        Ok(())
656                    }
657                })
658        }
659    }
660
661    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
662    /// than datatype_is_semantically_equal in that different representations of same data can be
663    /// logically but not semantically equivalent. Semantically equivalent types are always also
664    /// logically equivalent. For example:
665    /// - a Dictionary<K,V> type is logically equal to a plain V type
666    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
667    /// - Utf8 and Utf8View are logically equal
668    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
669        // check nested fields
670        match (dt1, dt2) {
671            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
672                v1.as_ref() == v2.as_ref()
673            }
674            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
675            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
676            (DataType::List(f1), DataType::List(f2))
677            | (DataType::LargeList(f1), DataType::LargeList(f2))
678            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
679                // Don't compare the names of the technical inner field
680                // Usually "item" but that's not mandated
681                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
682            }
683            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
684                // Don't compare the names of the technical inner fields
685                // Usually "entries", "key", "value" but that's not mandated
686                match (f1.data_type(), f2.data_type()) {
687                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
688                        f1_inner.len() == f2_inner.len()
689                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
690                                Self::datatype_is_logically_equal(
691                                    f1.data_type(),
692                                    f2.data_type(),
693                                )
694                            })
695                    }
696                    _ => panic!("Map type should have an inner struct field"),
697                }
698            }
699            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
700                let iter1 = fields1.iter();
701                let iter2 = fields2.iter();
702                fields1.len() == fields2.len() &&
703                        // all fields have to be the same
704                    iter1
705                    .zip(iter2)
706                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
707            }
708            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
709                let iter1 = fields1.iter();
710                let iter2 = fields2.iter();
711                fields1.len() == fields2.len() &&
712                    // all fields have to be the same
713                    iter1
714                        .zip(iter2)
715                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
716            }
717            // Utf8 and Utf8View are logically equivalent
718            (DataType::Utf8, DataType::Utf8View) => true,
719            (DataType::Utf8View, DataType::Utf8) => true,
720            _ => Self::datatype_is_semantically_equal(dt1, dt2),
721        }
722    }
723
724    /// Returns true of two [`DataType`]s are semantically equal (same
725    /// name and type), ignoring both metadata and nullability, and decimal precision/scale.
726    ///
727    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
728    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
729        // check nested fields
730        match (dt1, dt2) {
731            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
732                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
733                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
734            }
735            (DataType::List(f1), DataType::List(f2))
736            | (DataType::LargeList(f1), DataType::LargeList(f2))
737            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
738                // Don't compare the names of the technical inner field
739                // Usually "item" but that's not mandated
740                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
741            }
742            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
743                // Don't compare the names of the technical inner fields
744                // Usually "entries", "key", "value" but that's not mandated
745                match (f1.data_type(), f2.data_type()) {
746                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
747                        f1_inner.len() == f2_inner.len()
748                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
749                                Self::datatype_is_semantically_equal(
750                                    f1.data_type(),
751                                    f2.data_type(),
752                                )
753                            })
754                    }
755                    _ => panic!("Map type should have an inner struct field"),
756                }
757            }
758            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
759                let iter1 = fields1.iter();
760                let iter2 = fields2.iter();
761                fields1.len() == fields2.len() &&
762                        // all fields have to be the same
763                    iter1
764                    .zip(iter2)
765                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
766            }
767            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
768                let iter1 = fields1.iter();
769                let iter2 = fields2.iter();
770                fields1.len() == fields2.len() &&
771                    // all fields have to be the same
772                    iter1
773                        .zip(iter2)
774                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
775            }
776            (
777                DataType::Decimal128(_l_precision, _l_scale),
778                DataType::Decimal128(_r_precision, _r_scale),
779            ) => true,
780            (
781                DataType::Decimal256(_l_precision, _l_scale),
782                DataType::Decimal256(_r_precision, _r_scale),
783            ) => true,
784            _ => dt1 == dt2,
785        }
786    }
787
788    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
789        f1.name() == f2.name()
790            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
791    }
792
793    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
794        f1.name() == f2.name()
795            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
796    }
797
798    /// Strip all field qualifier in schema
799    pub fn strip_qualifiers(self) -> Self {
800        DFSchema {
801            field_qualifiers: vec![None; self.inner.fields.len()],
802            inner: self.inner,
803            functional_dependencies: self.functional_dependencies,
804        }
805    }
806
807    /// Replace all field qualifier with new value in schema
808    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
809        let qualifier = qualifier.into();
810        DFSchema {
811            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
812            inner: self.inner,
813            functional_dependencies: self.functional_dependencies,
814        }
815    }
816
817    /// Get list of fully-qualified field names in this schema
818    pub fn field_names(&self) -> Vec<String> {
819        self.iter()
820            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
821            .collect::<Vec<_>>()
822    }
823
824    /// Get metadata of this schema
825    pub fn metadata(&self) -> &HashMap<String, String> {
826        &self.inner.metadata
827    }
828
829    /// Get functional dependencies
830    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
831        &self.functional_dependencies
832    }
833
834    /// Iterate over the qualifiers and fields in the DFSchema
835    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
836        self.field_qualifiers
837            .iter()
838            .zip(self.inner.fields().iter())
839            .map(|(qualifier, field)| (qualifier.as_ref(), field))
840    }
841}
842
843impl From<DFSchema> for Schema {
844    /// Convert DFSchema into a Schema
845    fn from(df_schema: DFSchema) -> Self {
846        let fields: Fields = df_schema.inner.fields.clone();
847        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
848    }
849}
850
851impl From<&DFSchema> for Schema {
852    /// Convert DFSchema reference into a Schema
853    fn from(df_schema: &DFSchema) -> Self {
854        let fields: Fields = df_schema.inner.fields.clone();
855        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
856    }
857}
858
859/// Allow DFSchema to be converted into an Arrow `&Schema`
860impl AsRef<Schema> for DFSchema {
861    fn as_ref(&self) -> &Schema {
862        self.as_arrow()
863    }
864}
865
866/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
867/// example)
868impl AsRef<SchemaRef> for DFSchema {
869    fn as_ref(&self) -> &SchemaRef {
870        self.inner()
871    }
872}
873
874/// Create a `DFSchema` from an Arrow schema
875impl TryFrom<Schema> for DFSchema {
876    type Error = DataFusionError;
877    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
878        Self::try_from(Arc::new(schema))
879    }
880}
881
882impl TryFrom<SchemaRef> for DFSchema {
883    type Error = DataFusionError;
884    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
885        let field_count = schema.fields.len();
886        let dfschema = Self {
887            inner: schema,
888            field_qualifiers: vec![None; field_count],
889            functional_dependencies: FunctionalDependencies::empty(),
890        };
891        Ok(dfschema)
892    }
893}
894
895impl From<DFSchema> for SchemaRef {
896    fn from(df_schema: DFSchema) -> Self {
897        SchemaRef::new(df_schema.into())
898    }
899}
900
901// Hashing refers to a subset of fields considered in PartialEq.
902impl Hash for DFSchema {
903    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
904        self.inner.fields.hash(state);
905        self.inner.metadata.len().hash(state); // HashMap is not hashable
906    }
907}
908
909/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
910pub trait ToDFSchema
911where
912    Self: Sized,
913{
914    /// Attempt to create a DSSchema
915    fn to_dfschema(self) -> Result<DFSchema>;
916
917    /// Attempt to create a DSSchemaRef
918    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
919        Ok(Arc::new(self.to_dfschema()?))
920    }
921}
922
923impl ToDFSchema for Schema {
924    fn to_dfschema(self) -> Result<DFSchema> {
925        DFSchema::try_from(self)
926    }
927}
928
929impl ToDFSchema for SchemaRef {
930    fn to_dfschema(self) -> Result<DFSchema> {
931        DFSchema::try_from(self)
932    }
933}
934
935impl ToDFSchema for Vec<Field> {
936    fn to_dfschema(self) -> Result<DFSchema> {
937        let field_count = self.len();
938        let schema = Schema {
939            fields: self.into(),
940            metadata: HashMap::new(),
941        };
942        let dfschema = DFSchema {
943            inner: schema.into(),
944            field_qualifiers: vec![None; field_count],
945            functional_dependencies: FunctionalDependencies::empty(),
946        };
947        Ok(dfschema)
948    }
949}
950
951impl Display for DFSchema {
952    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
953        write!(
954            f,
955            "fields:[{}], metadata:{:?}",
956            self.iter()
957                .map(|(q, f)| qualified_name(q, f.name()))
958                .collect::<Vec<String>>()
959                .join(", "),
960            self.inner.metadata
961        )
962    }
963}
964
965/// Provides schema information needed by certain methods of `Expr`
966/// (defined in the datafusion-common crate).
967///
968/// Note that this trait is implemented for &[DFSchema] which is
969/// widely used in the DataFusion codebase.
970pub trait ExprSchema: std::fmt::Debug {
971    /// Is this column reference nullable?
972    fn nullable(&self, col: &Column) -> Result<bool>;
973
974    /// What is the datatype of this column?
975    fn data_type(&self, col: &Column) -> Result<&DataType>;
976
977    /// Returns the column's optional metadata.
978    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;
979
980    /// Return the column's datatype and nullability
981    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)>;
982}
983
984// Implement `ExprSchema` for `Arc<DFSchema>`
985impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
986    fn nullable(&self, col: &Column) -> Result<bool> {
987        self.as_ref().nullable(col)
988    }
989
990    fn data_type(&self, col: &Column) -> Result<&DataType> {
991        self.as_ref().data_type(col)
992    }
993
994    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
995        ExprSchema::metadata(self.as_ref(), col)
996    }
997
998    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
999        self.as_ref().data_type_and_nullable(col)
1000    }
1001}
1002
1003impl ExprSchema for DFSchema {
1004    fn nullable(&self, col: &Column) -> Result<bool> {
1005        Ok(self.field_from_column(col)?.is_nullable())
1006    }
1007
1008    fn data_type(&self, col: &Column) -> Result<&DataType> {
1009        Ok(self.field_from_column(col)?.data_type())
1010    }
1011
1012    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1013        Ok(self.field_from_column(col)?.metadata())
1014    }
1015
1016    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1017        let field = self.field_from_column(col)?;
1018        Ok((field.data_type(), field.is_nullable()))
1019    }
1020}
1021
1022/// DataFusion-specific extensions to [`Schema`].
1023pub trait SchemaExt {
1024    /// This is a specialized version of Eq that ignores differences
1025    /// in nullability and metadata.
1026    ///
1027    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1028    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1029
1030    /// Returns nothing if the two schemas have the same qualified named
1031    /// fields with logically equivalent data types. Returns internal error otherwise.
1032    ///
1033    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1034    /// equivalence checking.
1035    ///
1036    /// It is only used by insert into cases.
1037    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1038}
1039
1040impl SchemaExt for Schema {
1041    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1042        if self.fields().len() != other.fields().len() {
1043            return false;
1044        }
1045
1046        self.fields()
1047            .iter()
1048            .zip(other.fields().iter())
1049            .all(|(f1, f2)| {
1050                f1.name() == f2.name()
1051                    && DFSchema::datatype_is_semantically_equal(
1052                        f1.data_type(),
1053                        f2.data_type(),
1054                    )
1055            })
1056    }
1057
1058    // It is only used by insert into cases.
1059    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1060        // case 1 : schema length mismatch
1061        if self.fields().len() != other.fields().len() {
1062            _plan_err!(
1063                "Inserting query must have the same schema length as the table. \
1064            Expected table schema length: {}, got: {}",
1065                self.fields().len(),
1066                other.fields().len()
1067            )
1068        } else {
1069            // case 2 : schema length match, but fields mismatch
1070            // check if the fields name are the same and have the same data types
1071            self.fields()
1072                .iter()
1073                .zip(other.fields().iter())
1074                .try_for_each(|(f1, f2)| {
1075                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1076                        _plan_err!(
1077                            "Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
1078                            but got '{}' with type {:?}.",
1079                            f1.name(),
1080                            f1.data_type(),
1081                            f2.name(),
1082                            f2.data_type())
1083                    } else {
1084                        Ok(())
1085                    }
1086                })
1087        }
1088    }
1089}
1090
1091pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1092    match qualifier {
1093        Some(q) => format!("{}.{}", q, name),
1094        None => name.to_string(),
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use crate::assert_contains;
1101
1102    use super::*;
1103
1104    #[test]
1105    fn qualifier_in_name() -> Result<()> {
1106        let col = Column::from_name("t1.c0");
1107        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1108        // lookup with unqualified name "t1.c0"
1109        let err = schema.index_of_column(&col).unwrap_err();
1110        let expected = "Schema error: No field named \"t1.c0\". \
1111            Column names are case sensitive. \
1112            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1113            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1114            Did you mean 't1.c0'?.";
1115        assert_eq!(err.strip_backtrace(), expected);
1116        Ok(())
1117    }
1118
1119    #[test]
1120    fn quoted_qualifiers_in_name() -> Result<()> {
1121        let col = Column::from_name("t1.c0");
1122        let schema = DFSchema::try_from_qualified_schema(
1123            "t1",
1124            &Schema::new(vec![
1125                Field::new("CapitalColumn", DataType::Boolean, true),
1126                Field::new("field.with.period", DataType::Boolean, true),
1127            ]),
1128        )?;
1129
1130        // lookup with unqualified name "t1.c0"
1131        let err = schema.index_of_column(&col).unwrap_err();
1132        let expected = "Schema error: No field named \"t1.c0\". \
1133            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1134        assert_eq!(err.strip_backtrace(), expected);
1135        Ok(())
1136    }
1137
1138    #[test]
1139    fn from_unqualified_schema() -> Result<()> {
1140        let schema = DFSchema::try_from(test_schema_1())?;
1141        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1142        Ok(())
1143    }
1144
1145    #[test]
1146    fn from_qualified_schema() -> Result<()> {
1147        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1148        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1149        Ok(())
1150    }
1151
1152    #[test]
1153    fn test_from_field_specific_qualified_schema() -> Result<()> {
1154        let schema = DFSchema::from_field_specific_qualified_schema(
1155            vec![Some("t1".into()), None],
1156            &Arc::new(Schema::new(vec![
1157                Field::new("c0", DataType::Boolean, true),
1158                Field::new("c1", DataType::Boolean, true),
1159            ])),
1160        )?;
1161        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1162        Ok(())
1163    }
1164
1165    #[test]
1166    fn test_from_qualified_fields() -> Result<()> {
1167        let schema = DFSchema::new_with_metadata(
1168            vec![
1169                (
1170                    Some("t0".into()),
1171                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1172                ),
1173                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1174            ],
1175            HashMap::new(),
1176        )?;
1177        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1178        Ok(())
1179    }
1180
1181    #[test]
1182    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1183        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1184        let arrow_schema: Schema = schema.into();
1185        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
1186        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
1187        assert_eq!(expected, arrow_schema.to_string());
1188        Ok(())
1189    }
1190
1191    #[test]
1192    fn join_qualified() -> Result<()> {
1193        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1194        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1195        let join = left.join(&right)?;
1196        assert_eq!(
1197            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1198            join.to_string()
1199        );
1200        // test valid access
1201        assert!(join
1202            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1203            .is_ok());
1204        assert!(join
1205            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1206            .is_ok());
1207        // test invalid access
1208        assert!(join.field_with_unqualified_name("c0").is_err());
1209        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1210        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1211        Ok(())
1212    }
1213
1214    #[test]
1215    fn join_qualified_duplicate() -> Result<()> {
1216        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1217        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1218        let join = left.join(&right);
1219        assert_eq!(
1220            join.unwrap_err().strip_backtrace(),
1221            "Schema error: Schema contains duplicate qualified field name t1.c0",
1222        );
1223        Ok(())
1224    }
1225
1226    #[test]
1227    fn join_unqualified_duplicate() -> Result<()> {
1228        let left = DFSchema::try_from(test_schema_1())?;
1229        let right = DFSchema::try_from(test_schema_1())?;
1230        let join = left.join(&right);
1231        assert_eq!(
1232            join.unwrap_err().strip_backtrace(),
1233            "Schema error: Schema contains duplicate unqualified field name c0"
1234        );
1235        Ok(())
1236    }
1237
1238    #[test]
1239    fn join_mixed() -> Result<()> {
1240        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1241        let right = DFSchema::try_from(test_schema_2())?;
1242        let join = left.join(&right)?;
1243        assert_eq!(
1244            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1245            join.to_string()
1246        );
1247        // test valid access
1248        assert!(join
1249            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1250            .is_ok());
1251        assert!(join.field_with_unqualified_name("c0").is_ok());
1252        assert!(join.field_with_unqualified_name("c100").is_ok());
1253        assert!(join.field_with_name(None, "c100").is_ok());
1254        // test invalid access
1255        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1256        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1257        assert!(join
1258            .field_with_qualified_name(&TableReference::bare(""), "c100")
1259            .is_err());
1260        Ok(())
1261    }
1262
1263    #[test]
1264    fn join_mixed_duplicate() -> Result<()> {
1265        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1266        let right = DFSchema::try_from(test_schema_1())?;
1267        let join = left.join(&right);
1268        assert_contains!(join.unwrap_err().to_string(),
1269                         "Schema error: Schema contains qualified \
1270                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1271        Ok(())
1272    }
1273
1274    #[test]
1275    fn helpful_error_messages() -> Result<()> {
1276        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1277        let expected_help = "Valid fields are t1.c0, t1.c1.";
1278        assert_contains!(
1279            schema
1280                .field_with_qualified_name(&TableReference::bare("x"), "y")
1281                .unwrap_err()
1282                .to_string(),
1283            expected_help
1284        );
1285        assert_contains!(
1286            schema
1287                .field_with_unqualified_name("y")
1288                .unwrap_err()
1289                .to_string(),
1290            expected_help
1291        );
1292        assert!(schema.index_of_column_by_name(None, "y").is_none());
1293        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1294
1295        Ok(())
1296    }
1297
1298    #[test]
1299    fn select_without_valid_fields() {
1300        let schema = DFSchema::empty();
1301
1302        let col = Column::from_qualified_name("t1.c0");
1303        let err = schema.index_of_column(&col).unwrap_err();
1304        let expected = "Schema error: No field named t1.c0.";
1305        assert_eq!(err.strip_backtrace(), expected);
1306
1307        // the same check without qualifier
1308        let col = Column::from_name("c0");
1309        let err = schema.index_of_column(&col).err().unwrap();
1310        let expected = "Schema error: No field named c0.";
1311        assert_eq!(err.strip_backtrace(), expected);
1312    }
1313
1314    #[test]
1315    fn into() {
1316        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1317        let arrow_schema = Schema::new_with_metadata(
1318            vec![Field::new("c0", DataType::Int64, true)],
1319            test_metadata(),
1320        );
1321        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1322
1323        let df_schema = DFSchema {
1324            inner: Arc::clone(&arrow_schema_ref),
1325            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1326            functional_dependencies: FunctionalDependencies::empty(),
1327        };
1328        let df_schema_ref = Arc::new(df_schema.clone());
1329
1330        {
1331            let arrow_schema = arrow_schema.clone();
1332            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1333
1334            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1335            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1336        }
1337
1338        {
1339            let arrow_schema = arrow_schema.clone();
1340            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1341
1342            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1343            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1344        }
1345
1346        // Now, consume the refs
1347        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1348        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1349    }
1350
1351    fn test_schema_1() -> Schema {
1352        Schema::new(vec![
1353            Field::new("c0", DataType::Boolean, true),
1354            Field::new("c1", DataType::Boolean, true),
1355        ])
1356    }
1357    #[test]
1358    fn test_dfschema_to_schema_conversion() {
1359        let mut a_metadata = HashMap::new();
1360        a_metadata.insert("key".to_string(), "value".to_string());
1361        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1362
1363        let mut b_metadata = HashMap::new();
1364        b_metadata.insert("key".to_string(), "value".to_string());
1365        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1366
1367        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1368
1369        let df_schema = DFSchema {
1370            inner: Arc::clone(&schema),
1371            field_qualifiers: vec![None; schema.fields.len()],
1372            functional_dependencies: FunctionalDependencies::empty(),
1373        };
1374
1375        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1376    }
1377
1378    #[test]
1379    fn test_contain_column() -> Result<()> {
1380        // qualified exists
1381        {
1382            let col = Column::from_qualified_name("t1.c0");
1383            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1384            assert!(schema.is_column_from_schema(&col));
1385        }
1386
1387        // qualified not exists
1388        {
1389            let col = Column::from_qualified_name("t1.c2");
1390            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1391            assert!(!schema.is_column_from_schema(&col));
1392        }
1393
1394        // unqualified exists
1395        {
1396            let col = Column::from_name("c0");
1397            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1398            assert!(schema.is_column_from_schema(&col));
1399        }
1400
1401        // unqualified not exists
1402        {
1403            let col = Column::from_name("c2");
1404            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1405            assert!(!schema.is_column_from_schema(&col));
1406        }
1407
1408        Ok(())
1409    }
1410
1411    #[test]
1412    fn test_datatype_is_logically_equal() {
1413        assert!(DFSchema::datatype_is_logically_equal(
1414            &DataType::Int8,
1415            &DataType::Int8
1416        ));
1417
1418        assert!(!DFSchema::datatype_is_logically_equal(
1419            &DataType::Int8,
1420            &DataType::Int16
1421        ));
1422
1423        // Test lists
1424
1425        // Succeeds if both have the same element type, disregards names and nullability
1426        assert!(DFSchema::datatype_is_logically_equal(
1427            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1428            &DataType::List(Field::new("element", DataType::Int8, false).into())
1429        ));
1430
1431        // Fails if element type is different
1432        assert!(!DFSchema::datatype_is_logically_equal(
1433            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1434            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1435        ));
1436
1437        // Test maps
1438        let map_field = DataType::Map(
1439            Field::new(
1440                "entries",
1441                DataType::Struct(Fields::from(vec![
1442                    Field::new("key", DataType::Int8, false),
1443                    Field::new("value", DataType::Int8, true),
1444                ])),
1445                true,
1446            )
1447            .into(),
1448            true,
1449        );
1450
1451        // Succeeds if both maps have the same key and value types, disregards names and nullability
1452        assert!(DFSchema::datatype_is_logically_equal(
1453            &map_field,
1454            &DataType::Map(
1455                Field::new(
1456                    "pairs",
1457                    DataType::Struct(Fields::from(vec![
1458                        Field::new("one", DataType::Int8, false),
1459                        Field::new("two", DataType::Int8, false)
1460                    ])),
1461                    true
1462                )
1463                .into(),
1464                true
1465            )
1466        ));
1467        // Fails if value type is different
1468        assert!(!DFSchema::datatype_is_logically_equal(
1469            &map_field,
1470            &DataType::Map(
1471                Field::new(
1472                    "entries",
1473                    DataType::Struct(Fields::from(vec![
1474                        Field::new("key", DataType::Int8, false),
1475                        Field::new("value", DataType::Int16, true)
1476                    ])),
1477                    true
1478                )
1479                .into(),
1480                true
1481            )
1482        ));
1483
1484        // Fails if key type is different
1485        assert!(!DFSchema::datatype_is_logically_equal(
1486            &map_field,
1487            &DataType::Map(
1488                Field::new(
1489                    "entries",
1490                    DataType::Struct(Fields::from(vec![
1491                        Field::new("key", DataType::Int16, false),
1492                        Field::new("value", DataType::Int8, true)
1493                    ])),
1494                    true
1495                )
1496                .into(),
1497                true
1498            )
1499        ));
1500
1501        // Test structs
1502
1503        let struct_field = DataType::Struct(Fields::from(vec![
1504            Field::new("a", DataType::Int8, true),
1505            Field::new("b", DataType::Int8, true),
1506        ]));
1507
1508        // Succeeds if both have same names and datatypes, ignores nullability
1509        assert!(DFSchema::datatype_is_logically_equal(
1510            &struct_field,
1511            &DataType::Struct(Fields::from(vec![
1512                Field::new("a", DataType::Int8, false),
1513                Field::new("b", DataType::Int8, true),
1514            ]))
1515        ));
1516
1517        // Fails if field names are different
1518        assert!(!DFSchema::datatype_is_logically_equal(
1519            &struct_field,
1520            &DataType::Struct(Fields::from(vec![
1521                Field::new("x", DataType::Int8, true),
1522                Field::new("y", DataType::Int8, true),
1523            ]))
1524        ));
1525
1526        // Fails if types are different
1527        assert!(!DFSchema::datatype_is_logically_equal(
1528            &struct_field,
1529            &DataType::Struct(Fields::from(vec![
1530                Field::new("a", DataType::Int16, true),
1531                Field::new("b", DataType::Int8, true),
1532            ]))
1533        ));
1534
1535        // Fails if more or less fields
1536        assert!(!DFSchema::datatype_is_logically_equal(
1537            &struct_field,
1538            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1539        ));
1540    }
1541
1542    #[test]
1543    fn test_datatype_is_logically_equivalent_to_dictionary() {
1544        // Dictionary is logically equal to its value type
1545        assert!(DFSchema::datatype_is_logically_equal(
1546            &DataType::Utf8,
1547            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1548        ));
1549    }
1550
1551    #[test]
1552    fn test_datatype_is_semantically_equal() {
1553        assert!(DFSchema::datatype_is_semantically_equal(
1554            &DataType::Int8,
1555            &DataType::Int8
1556        ));
1557
1558        assert!(!DFSchema::datatype_is_semantically_equal(
1559            &DataType::Int8,
1560            &DataType::Int16
1561        ));
1562
1563        // Test lists
1564
1565        // Succeeds if both have the same element type, disregards names and nullability
1566        assert!(DFSchema::datatype_is_semantically_equal(
1567            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1568            &DataType::List(Field::new("element", DataType::Int8, false).into())
1569        ));
1570
1571        // Fails if element type is different
1572        assert!(!DFSchema::datatype_is_semantically_equal(
1573            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1574            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1575        ));
1576
1577        // Test maps
1578        let map_field = DataType::Map(
1579            Field::new(
1580                "entries",
1581                DataType::Struct(Fields::from(vec![
1582                    Field::new("key", DataType::Int8, false),
1583                    Field::new("value", DataType::Int8, true),
1584                ])),
1585                true,
1586            )
1587            .into(),
1588            true,
1589        );
1590
1591        // Succeeds if both maps have the same key and value types, disregards names and nullability
1592        assert!(DFSchema::datatype_is_semantically_equal(
1593            &map_field,
1594            &DataType::Map(
1595                Field::new(
1596                    "pairs",
1597                    DataType::Struct(Fields::from(vec![
1598                        Field::new("one", DataType::Int8, false),
1599                        Field::new("two", DataType::Int8, false)
1600                    ])),
1601                    true
1602                )
1603                .into(),
1604                true
1605            )
1606        ));
1607        // Fails if value type is different
1608        assert!(!DFSchema::datatype_is_semantically_equal(
1609            &map_field,
1610            &DataType::Map(
1611                Field::new(
1612                    "entries",
1613                    DataType::Struct(Fields::from(vec![
1614                        Field::new("key", DataType::Int8, false),
1615                        Field::new("value", DataType::Int16, true)
1616                    ])),
1617                    true
1618                )
1619                .into(),
1620                true
1621            )
1622        ));
1623
1624        // Fails if key type is different
1625        assert!(!DFSchema::datatype_is_semantically_equal(
1626            &map_field,
1627            &DataType::Map(
1628                Field::new(
1629                    "entries",
1630                    DataType::Struct(Fields::from(vec![
1631                        Field::new("key", DataType::Int16, false),
1632                        Field::new("value", DataType::Int8, true)
1633                    ])),
1634                    true
1635                )
1636                .into(),
1637                true
1638            )
1639        ));
1640
1641        // Test structs
1642
1643        let struct_field = DataType::Struct(Fields::from(vec![
1644            Field::new("a", DataType::Int8, true),
1645            Field::new("b", DataType::Int8, true),
1646        ]));
1647
1648        // Succeeds if both have same names and datatypes, ignores nullability
1649        assert!(DFSchema::datatype_is_logically_equal(
1650            &struct_field,
1651            &DataType::Struct(Fields::from(vec![
1652                Field::new("a", DataType::Int8, false),
1653                Field::new("b", DataType::Int8, true),
1654            ]))
1655        ));
1656
1657        // Fails if field names are different
1658        assert!(!DFSchema::datatype_is_logically_equal(
1659            &struct_field,
1660            &DataType::Struct(Fields::from(vec![
1661                Field::new("x", DataType::Int8, true),
1662                Field::new("y", DataType::Int8, true),
1663            ]))
1664        ));
1665
1666        // Fails if types are different
1667        assert!(!DFSchema::datatype_is_logically_equal(
1668            &struct_field,
1669            &DataType::Struct(Fields::from(vec![
1670                Field::new("a", DataType::Int16, true),
1671                Field::new("b", DataType::Int8, true),
1672            ]))
1673        ));
1674
1675        // Fails if more or less fields
1676        assert!(!DFSchema::datatype_is_logically_equal(
1677            &struct_field,
1678            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1679        ));
1680    }
1681
1682    #[test]
1683    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1684        // Dictionary is not semantically equal to its value type
1685        assert!(!DFSchema::datatype_is_semantically_equal(
1686            &DataType::Utf8,
1687            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1688        ));
1689    }
1690
1691    fn test_schema_2() -> Schema {
1692        Schema::new(vec![
1693            Field::new("c100", DataType::Boolean, true),
1694            Field::new("c101", DataType::Boolean, true),
1695        ])
1696    }
1697
1698    fn test_metadata() -> HashMap<String, String> {
1699        test_metadata_n(2)
1700    }
1701
1702    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1703        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1704    }
1705}