datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use datafusion_common::{DFSchema, Column};
60/// use arrow::datatypes::{DataType, Field, Schema};
61///
62/// let arrow_schema = Schema::new(vec![
63///    Field::new("c1", DataType::Int32, false),
64/// ]);
65///
66/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
67/// let column = Column::from_qualified_name("t1.c1");
68/// assert!(df_schema.has_column(&column));
69///
70/// // Can also access qualified fields with unqualified name, if it's unambiguous
71/// let column = Column::from_qualified_name("c1");
72/// assert!(df_schema.has_column(&column));
73/// ```
74///
75/// # Creating unqualified schemas
76///
77/// Create an unqualified schema using TryFrom:
78///
79/// ```rust
80/// use datafusion_common::{DFSchema, Column};
81/// use arrow::datatypes::{DataType, Field, Schema};
82///
83/// let arrow_schema = Schema::new(vec![
84///    Field::new("c1", DataType::Int32, false),
85/// ]);
86///
87/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
88/// let column = Column::new_unqualified("c1");
89/// assert!(df_schema.has_column(&column));
90/// ```
91///
92/// # Converting back to Arrow schema
93///
94/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
95///
96/// ```rust
97/// use datafusion_common::DFSchema;
98/// use arrow::datatypes::{Schema, Field};
99/// use std::collections::HashMap;
100///
101/// let df_schema = DFSchema::from_unqualified_fields(vec![
102///    Field::new("c1", arrow::datatypes::DataType::Int32, false),
103/// ].into(),HashMap::new()).unwrap();
104/// let schema = Schema::from(df_schema);
105/// assert_eq!(schema.fields().len(), 1);
106/// ```
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DFSchema {
109    /// Inner Arrow schema reference.
110    inner: SchemaRef,
111    /// Optional qualifiers for each column in this schema. In the same order as
112    /// the `self.inner.fields()`
113    field_qualifiers: Vec<Option<TableReference>>,
114    /// Stores functional dependencies in the schema.
115    functional_dependencies: FunctionalDependencies,
116}
117
118impl DFSchema {
119    /// Creates an empty `DFSchema`
120    pub fn empty() -> Self {
121        Self {
122            inner: Arc::new(Schema::new([])),
123            field_qualifiers: vec![],
124            functional_dependencies: FunctionalDependencies::empty(),
125        }
126    }
127
128    /// Return a reference to the inner Arrow [`Schema`]
129    ///
130    /// Note this does not have the qualifier information
131    pub fn as_arrow(&self) -> &Schema {
132        self.inner.as_ref()
133    }
134
135    /// Return a reference to the inner Arrow [`SchemaRef`]
136    ///
137    /// Note this does not have the qualifier information
138    pub fn inner(&self) -> &SchemaRef {
139        &self.inner
140    }
141
142    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
143    pub fn new_with_metadata(
144        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145        metadata: HashMap<String, String>,
146    ) -> Result<Self> {
147        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
148            qualified_fields.into_iter().unzip();
149
150        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
151
152        let dfschema = Self {
153            inner: schema,
154            field_qualifiers: qualifiers,
155            functional_dependencies: FunctionalDependencies::empty(),
156        };
157        dfschema.check_names()?;
158        Ok(dfschema)
159    }
160
161    /// Create a new `DFSchema` from a list of Arrow [Field]s
162    pub fn from_unqualified_fields(
163        fields: Fields,
164        metadata: HashMap<String, String>,
165    ) -> Result<Self> {
166        let field_count = fields.len();
167        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
168        let dfschema = Self {
169            inner: schema,
170            field_qualifiers: vec![None; field_count],
171            functional_dependencies: FunctionalDependencies::empty(),
172        };
173        dfschema.check_names()?;
174        Ok(dfschema)
175    }
176
177    /// Create a `DFSchema` from an Arrow schema and a given qualifier
178    ///
179    /// To create a schema from an Arrow schema without a qualifier, use
180    /// `DFSchema::try_from`.
181    pub fn try_from_qualified_schema(
182        qualifier: impl Into<TableReference>,
183        schema: &Schema,
184    ) -> Result<Self> {
185        let qualifier = qualifier.into();
186        let schema = DFSchema {
187            inner: schema.clone().into(),
188            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
189            functional_dependencies: FunctionalDependencies::empty(),
190        };
191        schema.check_names()?;
192        Ok(schema)
193    }
194
195    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
196    pub fn from_field_specific_qualified_schema(
197        qualifiers: Vec<Option<TableReference>>,
198        schema: &SchemaRef,
199    ) -> Result<Self> {
200        let dfschema = Self {
201            inner: Arc::clone(schema),
202            field_qualifiers: qualifiers,
203            functional_dependencies: FunctionalDependencies::empty(),
204        };
205        dfschema.check_names()?;
206        Ok(dfschema)
207    }
208
209    /// Return the same schema, where all fields have a given qualifier.
210    pub fn with_field_specific_qualified_schema(
211        &self,
212        qualifiers: Vec<Option<TableReference>>,
213    ) -> Result<Self> {
214        if qualifiers.len() != self.fields().len() {
215            return _plan_err!(
216                "Number of qualifiers must match number of fields. Expected {}, got {}",
217                self.fields().len(),
218                qualifiers.len()
219            );
220        }
221        Ok(DFSchema {
222            inner: Arc::clone(&self.inner),
223            field_qualifiers: qualifiers,
224            functional_dependencies: self.functional_dependencies.clone(),
225        })
226    }
227
228    /// Check if the schema have some fields with the same name
229    pub fn check_names(&self) -> Result<()> {
230        let mut qualified_names = BTreeSet::new();
231        let mut unqualified_names = BTreeSet::new();
232
233        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
234            if let Some(qualifier) = qualifier {
235                if !qualified_names.insert((qualifier, field.name())) {
236                    return _schema_err!(SchemaError::DuplicateQualifiedField {
237                        qualifier: Box::new(qualifier.clone()),
238                        name: field.name().to_string(),
239                    });
240                }
241            } else if !unqualified_names.insert(field.name()) {
242                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
243                    name: field.name().to_string()
244                });
245            }
246        }
247
248        for (qualifier, name) in qualified_names {
249            if unqualified_names.contains(name) {
250                return _schema_err!(SchemaError::AmbiguousReference {
251                    field: Box::new(Column::new(Some(qualifier.clone()), name))
252                });
253            }
254        }
255        Ok(())
256    }
257
258    /// Assigns functional dependencies.
259    pub fn with_functional_dependencies(
260        mut self,
261        functional_dependencies: FunctionalDependencies,
262    ) -> Result<Self> {
263        if functional_dependencies.is_valid(self.inner.fields.len()) {
264            self.functional_dependencies = functional_dependencies;
265            Ok(self)
266        } else {
267            _plan_err!(
268                "Invalid functional dependency: {:?}",
269                functional_dependencies
270            )
271        }
272    }
273
274    /// Create a new schema that contains the fields from this schema followed by the fields
275    /// from the supplied schema. An error will be returned if there are duplicate field names.
276    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
277        let mut schema_builder = SchemaBuilder::new();
278        schema_builder.extend(self.inner.fields().iter().cloned());
279        schema_builder.extend(schema.fields().iter().cloned());
280        let new_schema = schema_builder.finish();
281
282        let mut new_metadata = self.inner.metadata.clone();
283        new_metadata.extend(schema.inner.metadata.clone());
284        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
285
286        let mut new_qualifiers = self.field_qualifiers.clone();
287        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
288
289        let new_self = Self {
290            inner: Arc::new(new_schema_with_metadata),
291            field_qualifiers: new_qualifiers,
292            functional_dependencies: FunctionalDependencies::empty(),
293        };
294        new_self.check_names()?;
295        Ok(new_self)
296    }
297
298    /// Modify this schema by appending the fields from the supplied schema, ignoring any
299    /// duplicate fields.
300    pub fn merge(&mut self, other_schema: &DFSchema) {
301        if other_schema.inner.fields.is_empty() {
302            return;
303        }
304
305        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
306            self.iter().collect();
307        let self_unqualified_names: HashSet<&str> = self
308            .inner
309            .fields
310            .iter()
311            .map(|field| field.name().as_str())
312            .collect();
313
314        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
315        let mut qualifiers = Vec::new();
316        for (qualifier, field) in other_schema.iter() {
317            // skip duplicate columns
318            let duplicated_field = match qualifier {
319                Some(q) => self_fields.contains(&(Some(q), field)),
320                // for unqualified columns, check as unqualified name
321                None => self_unqualified_names.contains(field.name().as_str()),
322            };
323            if !duplicated_field {
324                schema_builder.push(Arc::clone(field));
325                qualifiers.push(qualifier.cloned());
326            }
327        }
328        let mut metadata = self.inner.metadata.clone();
329        metadata.extend(other_schema.inner.metadata.clone());
330
331        let finished = schema_builder.finish();
332        let finished_with_metadata = finished.with_metadata(metadata);
333        self.inner = finished_with_metadata.into();
334        self.field_qualifiers.extend(qualifiers);
335    }
336
337    /// Get a list of fields
338    pub fn fields(&self) -> &Fields {
339        &self.inner.fields
340    }
341
342    /// Returns an immutable reference of a specific `Field` instance selected using an
343    /// offset within the internal `fields` vector
344    pub fn field(&self, i: usize) -> &Field {
345        &self.inner.fields[i]
346    }
347
348    /// Returns an immutable reference of a specific `Field` instance selected using an
349    /// offset within the internal `fields` vector and its qualifier
350    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
351        (self.field_qualifiers[i].as_ref(), self.field(i))
352    }
353
354    pub fn index_of_column_by_name(
355        &self,
356        qualifier: Option<&TableReference>,
357        name: &str,
358    ) -> Option<usize> {
359        let mut matches = self
360            .iter()
361            .enumerate()
362            .filter(|(_, (q, f))| match (qualifier, q) {
363                // field to lookup is qualified.
364                // current field is qualified and not shared between relations, compare both
365                // qualifier and name.
366                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
367                // field to lookup is qualified but current field is unqualified.
368                (Some(_), None) => false,
369                // field to lookup is unqualified, no need to compare qualifier
370                (None, Some(_)) | (None, None) => f.name() == name,
371            })
372            .map(|(idx, _)| idx);
373        matches.next()
374    }
375
376    /// Find the index of the column with the given qualifier and name,
377    /// returning `None` if not found
378    ///
379    /// See [Self::index_of_column] for a version that returns an error if the
380    /// column is not found
381    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
382        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
383    }
384
385    /// Find the index of the column with the given qualifier and name,
386    /// returning `Err` if not found
387    ///
388    /// See [Self::maybe_index_of_column] for a version that returns `None` if
389    /// the column is not found
390    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
391        self.maybe_index_of_column(col)
392            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
393    }
394
395    /// Check if the column is in the current schema
396    pub fn is_column_from_schema(&self, col: &Column) -> bool {
397        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
398            .is_some()
399    }
400
401    /// Find the field with the given name
402    pub fn field_with_name(
403        &self,
404        qualifier: Option<&TableReference>,
405        name: &str,
406    ) -> Result<&Field> {
407        if let Some(qualifier) = qualifier {
408            self.field_with_qualified_name(qualifier, name)
409        } else {
410            self.field_with_unqualified_name(name)
411        }
412    }
413
414    /// Find the qualified field with the given name
415    pub fn qualified_field_with_name(
416        &self,
417        qualifier: Option<&TableReference>,
418        name: &str,
419    ) -> Result<(Option<&TableReference>, &Field)> {
420        if let Some(qualifier) = qualifier {
421            let idx = self
422                .index_of_column_by_name(Some(qualifier), name)
423                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
424            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
425        } else {
426            self.qualified_field_with_unqualified_name(name)
427        }
428    }
429
430    /// Find all fields having the given qualifier
431    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
432        self.iter()
433            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
434            .map(|(_, f)| f.as_ref())
435            .collect()
436    }
437
438    /// Find all fields indices having the given qualifier
439    pub fn fields_indices_with_qualified(
440        &self,
441        qualifier: &TableReference,
442    ) -> Vec<usize> {
443        self.iter()
444            .enumerate()
445            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
446            .collect()
447    }
448
449    /// Find all fields that match the given name
450    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
451        self.fields()
452            .iter()
453            .filter(|field| field.name() == name)
454            .map(|f| f.as_ref())
455            .collect()
456    }
457
458    /// Find all fields that match the given name and return them with their qualifier
459    pub fn qualified_fields_with_unqualified_name(
460        &self,
461        name: &str,
462    ) -> Vec<(Option<&TableReference>, &Field)> {
463        self.iter()
464            .filter(|(_, field)| field.name() == name)
465            .map(|(qualifier, field)| (qualifier, field.as_ref()))
466            .collect()
467    }
468
469    /// Find all fields that match the given name and convert to column
470    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
471        self.iter()
472            .filter(|(_, field)| field.name() == name)
473            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
474            .collect()
475    }
476
477    /// Return all `Column`s for the schema
478    pub fn columns(&self) -> Vec<Column> {
479        self.iter()
480            .map(|(qualifier, field)| {
481                Column::new(qualifier.cloned(), field.name().clone())
482            })
483            .collect()
484    }
485
486    /// Find the qualified field with the given unqualified name
487    pub fn qualified_field_with_unqualified_name(
488        &self,
489        name: &str,
490    ) -> Result<(Option<&TableReference>, &Field)> {
491        let matches = self.qualified_fields_with_unqualified_name(name);
492        match matches.len() {
493            0 => Err(unqualified_field_not_found(name, self)),
494            1 => Ok((matches[0].0, matches[0].1)),
495            _ => {
496                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
497                // Because name may generate from Alias/... . It means that it don't own qualifier.
498                // For example:
499                //             Join on id = b.id
500                // Project a.id as id   TableScan b id
501                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
502                // one field without qualifier, we should return it.
503                let fields_without_qualifier = matches
504                    .iter()
505                    .filter(|(q, _)| q.is_none())
506                    .collect::<Vec<_>>();
507                if fields_without_qualifier.len() == 1 {
508                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
509                } else {
510                    _schema_err!(SchemaError::AmbiguousReference {
511                        field: Box::new(Column::new_unqualified(name.to_string()))
512                    })
513                }
514            }
515        }
516    }
517
518    /// Find the field with the given name
519    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
520        self.qualified_field_with_unqualified_name(name)
521            .map(|(_, field)| field)
522    }
523
524    /// Find the field with the given qualified name
525    pub fn field_with_qualified_name(
526        &self,
527        qualifier: &TableReference,
528        name: &str,
529    ) -> Result<&Field> {
530        let idx = self
531            .index_of_column_by_name(Some(qualifier), name)
532            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
533
534        Ok(self.field(idx))
535    }
536
537    /// Find the field with the given qualified column
538    pub fn qualified_field_from_column(
539        &self,
540        column: &Column,
541    ) -> Result<(Option<&TableReference>, &Field)> {
542        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
543    }
544
545    /// Find if the field exists with the given name
546    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
547        self.fields().iter().any(|field| field.name() == name)
548    }
549
550    /// Find if the field exists with the given qualified name
551    pub fn has_column_with_qualified_name(
552        &self,
553        qualifier: &TableReference,
554        name: &str,
555    ) -> bool {
556        self.iter()
557            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
558    }
559
560    /// Find if the field exists with the given qualified column
561    pub fn has_column(&self, column: &Column) -> bool {
562        match &column.relation {
563            Some(r) => self.has_column_with_qualified_name(r, &column.name),
564            None => self.has_column_with_unqualified_name(&column.name),
565        }
566    }
567
568    /// Check to see if unqualified field names matches field names in Arrow schema
569    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
570        self.inner
571            .fields
572            .iter()
573            .zip(arrow_schema.fields().iter())
574            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
575    }
576
577    /// Check to see if fields in 2 Arrow schemas are compatible
578    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
579    pub fn check_arrow_schema_type_compatible(
580        &self,
581        arrow_schema: &Schema,
582    ) -> Result<()> {
583        let self_arrow_schema: Schema = self.into();
584        self_arrow_schema
585            .fields()
586            .iter()
587            .zip(arrow_schema.fields().iter())
588            .try_for_each(|(l_field, r_field)| {
589                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
590                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
591                                r_field.name(),
592                                r_field.data_type(),
593                                l_field.name(),
594                                l_field.data_type())
595                } else {
596                    Ok(())
597                }
598            })
599    }
600
601    /// Returns true if the two schemas have the same qualified named
602    /// fields with logically equivalent data types. Returns false otherwise.
603    ///
604    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
605    /// equivalence checking.
606    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
607        if self.fields().len() != other.fields().len() {
608            return false;
609        }
610        let self_fields = self.iter();
611        let other_fields = other.iter();
612        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
613            q1 == q2
614                && f1.name() == f2.name()
615                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
616        })
617    }
618
619    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
620    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
621        self.has_equivalent_names_and_types(other).is_ok()
622    }
623
624    /// Returns Ok if the two schemas have the same qualified named
625    /// fields with the compatible data types.
626    ///
627    /// Returns an `Err` with a message otherwise.
628    ///
629    /// This is a specialized version of Eq that ignores differences in
630    /// nullability and metadata.
631    ///
632    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
633    /// logical type checking, which for example would consider a dictionary
634    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
635    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
636        // case 1 : schema length mismatch
637        if self.fields().len() != other.fields().len() {
638            _plan_err!(
639                "Schema mismatch: the schema length are not same \
640            Expected schema length: {}, got: {}",
641                self.fields().len(),
642                other.fields().len()
643            )
644        } else {
645            // case 2 : schema length match, but fields mismatch
646            // check if the fields name are the same and have the same data types
647            self.fields()
648                .iter()
649                .zip(other.fields().iter())
650                .try_for_each(|(f1, f2)| {
651                    if f1.name() != f2.name()
652                        || (!DFSchema::datatype_is_semantically_equal(
653                            f1.data_type(),
654                            f2.data_type(),
655                        ))
656                    {
657                        _plan_err!(
658                            "Schema mismatch: Expected field '{}' with type {:?}, \
659                            but got '{}' with type {:?}.",
660                            f1.name(),
661                            f1.data_type(),
662                            f2.name(),
663                            f2.data_type()
664                        )
665                    } else {
666                        Ok(())
667                    }
668                })
669        }
670    }
671
672    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
673    /// than datatype_is_semantically_equal in that different representations of same data can be
674    /// logically but not semantically equivalent. Semantically equivalent types are always also
675    /// logically equivalent. For example:
676    /// - a Dictionary<K,V> type is logically equal to a plain V type
677    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
678    /// - Utf8 and Utf8View are logically equal
679    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
680        // check nested fields
681        match (dt1, dt2) {
682            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
683                v1.as_ref() == v2.as_ref()
684            }
685            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
686            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
687            (DataType::List(f1), DataType::List(f2))
688            | (DataType::LargeList(f1), DataType::LargeList(f2))
689            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
690                // Don't compare the names of the technical inner field
691                // Usually "item" but that's not mandated
692                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
693            }
694            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
695                // Don't compare the names of the technical inner fields
696                // Usually "entries", "key", "value" but that's not mandated
697                match (f1.data_type(), f2.data_type()) {
698                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
699                        f1_inner.len() == f2_inner.len()
700                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
701                                Self::datatype_is_logically_equal(
702                                    f1.data_type(),
703                                    f2.data_type(),
704                                )
705                            })
706                    }
707                    _ => panic!("Map type should have an inner struct field"),
708                }
709            }
710            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
711                let iter1 = fields1.iter();
712                let iter2 = fields2.iter();
713                fields1.len() == fields2.len() &&
714                        // all fields have to be the same
715                    iter1
716                    .zip(iter2)
717                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
718            }
719            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
720                let iter1 = fields1.iter();
721                let iter2 = fields2.iter();
722                fields1.len() == fields2.len() &&
723                    // all fields have to be the same
724                    iter1
725                        .zip(iter2)
726                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
727            }
728            // Utf8 and Utf8View are logically equivalent
729            (DataType::Utf8, DataType::Utf8View) => true,
730            (DataType::Utf8View, DataType::Utf8) => true,
731            _ => Self::datatype_is_semantically_equal(dt1, dt2),
732        }
733    }
734
735    /// Returns true of two [`DataType`]s are semantically equal (same
736    /// name and type), ignoring both metadata and nullability, and decimal precision/scale.
737    ///
738    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
739    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
740        // check nested fields
741        match (dt1, dt2) {
742            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
743                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
744                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
745            }
746            (DataType::List(f1), DataType::List(f2))
747            | (DataType::LargeList(f1), DataType::LargeList(f2))
748            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
749                // Don't compare the names of the technical inner field
750                // Usually "item" but that's not mandated
751                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
752            }
753            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
754                // Don't compare the names of the technical inner fields
755                // Usually "entries", "key", "value" but that's not mandated
756                match (f1.data_type(), f2.data_type()) {
757                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
758                        f1_inner.len() == f2_inner.len()
759                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
760                                Self::datatype_is_semantically_equal(
761                                    f1.data_type(),
762                                    f2.data_type(),
763                                )
764                            })
765                    }
766                    _ => panic!("Map type should have an inner struct field"),
767                }
768            }
769            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
770                let iter1 = fields1.iter();
771                let iter2 = fields2.iter();
772                fields1.len() == fields2.len() &&
773                        // all fields have to be the same
774                    iter1
775                    .zip(iter2)
776                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
777            }
778            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
779                let iter1 = fields1.iter();
780                let iter2 = fields2.iter();
781                fields1.len() == fields2.len() &&
782                    // all fields have to be the same
783                    iter1
784                        .zip(iter2)
785                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
786            }
787            (
788                DataType::Decimal128(_l_precision, _l_scale),
789                DataType::Decimal128(_r_precision, _r_scale),
790            ) => true,
791            (
792                DataType::Decimal256(_l_precision, _l_scale),
793                DataType::Decimal256(_r_precision, _r_scale),
794            ) => true,
795            _ => dt1 == dt2,
796        }
797    }
798
799    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
800        f1.name() == f2.name()
801            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
802    }
803
804    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
805        f1.name() == f2.name()
806            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
807    }
808
809    /// Strip all field qualifier in schema
810    pub fn strip_qualifiers(self) -> Self {
811        DFSchema {
812            field_qualifiers: vec![None; self.inner.fields.len()],
813            inner: self.inner,
814            functional_dependencies: self.functional_dependencies,
815        }
816    }
817
818    /// Replace all field qualifier with new value in schema
819    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
820        let qualifier = qualifier.into();
821        DFSchema {
822            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
823            inner: self.inner,
824            functional_dependencies: self.functional_dependencies,
825        }
826    }
827
828    /// Get list of fully-qualified field names in this schema
829    pub fn field_names(&self) -> Vec<String> {
830        self.iter()
831            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
832            .collect::<Vec<_>>()
833    }
834
835    /// Get metadata of this schema
836    pub fn metadata(&self) -> &HashMap<String, String> {
837        &self.inner.metadata
838    }
839
840    /// Get functional dependencies
841    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
842        &self.functional_dependencies
843    }
844
845    /// Iterate over the qualifiers and fields in the DFSchema
846    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
847        self.field_qualifiers
848            .iter()
849            .zip(self.inner.fields().iter())
850            .map(|(qualifier, field)| (qualifier.as_ref(), field))
851    }
852}
853
854impl From<DFSchema> for Schema {
855    /// Convert DFSchema into a Schema
856    fn from(df_schema: DFSchema) -> Self {
857        let fields: Fields = df_schema.inner.fields.clone();
858        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
859    }
860}
861
862impl From<&DFSchema> for Schema {
863    /// Convert DFSchema reference into a Schema
864    fn from(df_schema: &DFSchema) -> Self {
865        let fields: Fields = df_schema.inner.fields.clone();
866        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
867    }
868}
869
870/// Allow DFSchema to be converted into an Arrow `&Schema`
871impl AsRef<Schema> for DFSchema {
872    fn as_ref(&self) -> &Schema {
873        self.as_arrow()
874    }
875}
876
877/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
878/// example)
879impl AsRef<SchemaRef> for DFSchema {
880    fn as_ref(&self) -> &SchemaRef {
881        self.inner()
882    }
883}
884
885/// Create a `DFSchema` from an Arrow schema
886impl TryFrom<Schema> for DFSchema {
887    type Error = DataFusionError;
888    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
889        Self::try_from(Arc::new(schema))
890    }
891}
892
893impl TryFrom<SchemaRef> for DFSchema {
894    type Error = DataFusionError;
895    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
896        let field_count = schema.fields.len();
897        let dfschema = Self {
898            inner: schema,
899            field_qualifiers: vec![None; field_count],
900            functional_dependencies: FunctionalDependencies::empty(),
901        };
902        Ok(dfschema)
903    }
904}
905
906impl From<DFSchema> for SchemaRef {
907    fn from(df_schema: DFSchema) -> Self {
908        SchemaRef::new(df_schema.into())
909    }
910}
911
912// Hashing refers to a subset of fields considered in PartialEq.
913impl Hash for DFSchema {
914    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
915        self.inner.fields.hash(state);
916        self.inner.metadata.len().hash(state); // HashMap is not hashable
917    }
918}
919
920/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
921pub trait ToDFSchema
922where
923    Self: Sized,
924{
925    /// Attempt to create a DSSchema
926    fn to_dfschema(self) -> Result<DFSchema>;
927
928    /// Attempt to create a DSSchemaRef
929    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
930        Ok(Arc::new(self.to_dfschema()?))
931    }
932}
933
934impl ToDFSchema for Schema {
935    fn to_dfschema(self) -> Result<DFSchema> {
936        DFSchema::try_from(self)
937    }
938}
939
940impl ToDFSchema for SchemaRef {
941    fn to_dfschema(self) -> Result<DFSchema> {
942        DFSchema::try_from(self)
943    }
944}
945
946impl ToDFSchema for Vec<Field> {
947    fn to_dfschema(self) -> Result<DFSchema> {
948        let field_count = self.len();
949        let schema = Schema {
950            fields: self.into(),
951            metadata: HashMap::new(),
952        };
953        let dfschema = DFSchema {
954            inner: schema.into(),
955            field_qualifiers: vec![None; field_count],
956            functional_dependencies: FunctionalDependencies::empty(),
957        };
958        Ok(dfschema)
959    }
960}
961
962impl Display for DFSchema {
963    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
964        write!(
965            f,
966            "fields:[{}], metadata:{:?}",
967            self.iter()
968                .map(|(q, f)| qualified_name(q, f.name()))
969                .collect::<Vec<String>>()
970                .join(", "),
971            self.inner.metadata
972        )
973    }
974}
975
976/// Provides schema information needed by certain methods of `Expr`
977/// (defined in the datafusion-common crate).
978///
979/// Note that this trait is implemented for &[DFSchema] which is
980/// widely used in the DataFusion codebase.
981pub trait ExprSchema: std::fmt::Debug {
982    /// Is this column reference nullable?
983    fn nullable(&self, col: &Column) -> Result<bool> {
984        Ok(self.field_from_column(col)?.is_nullable())
985    }
986
987    /// What is the datatype of this column?
988    fn data_type(&self, col: &Column) -> Result<&DataType> {
989        Ok(self.field_from_column(col)?.data_type())
990    }
991
992    /// Returns the column's optional metadata.
993    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
994        Ok(self.field_from_column(col)?.metadata())
995    }
996
997    /// Return the column's datatype and nullability
998    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
999        let field = self.field_from_column(col)?;
1000        Ok((field.data_type(), field.is_nullable()))
1001    }
1002
1003    // Return the column's field
1004    fn field_from_column(&self, col: &Column) -> Result<&Field>;
1005}
1006
1007// Implement `ExprSchema` for `Arc<DFSchema>`
1008impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1009    fn nullable(&self, col: &Column) -> Result<bool> {
1010        self.as_ref().nullable(col)
1011    }
1012
1013    fn data_type(&self, col: &Column) -> Result<&DataType> {
1014        self.as_ref().data_type(col)
1015    }
1016
1017    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1018        ExprSchema::metadata(self.as_ref(), col)
1019    }
1020
1021    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1022        self.as_ref().data_type_and_nullable(col)
1023    }
1024
1025    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1026        self.as_ref().field_from_column(col)
1027    }
1028}
1029
1030impl ExprSchema for DFSchema {
1031    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1032        match &col.relation {
1033            Some(r) => self.field_with_qualified_name(r, &col.name),
1034            None => self.field_with_unqualified_name(&col.name),
1035        }
1036    }
1037}
1038
1039/// DataFusion-specific extensions to [`Schema`].
1040pub trait SchemaExt {
1041    /// This is a specialized version of Eq that ignores differences
1042    /// in nullability and metadata.
1043    ///
1044    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1045    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1046
1047    /// Returns nothing if the two schemas have the same qualified named
1048    /// fields with logically equivalent data types. Returns internal error otherwise.
1049    ///
1050    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1051    /// equivalence checking.
1052    ///
1053    /// It is only used by insert into cases.
1054    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1055}
1056
1057impl SchemaExt for Schema {
1058    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1059        if self.fields().len() != other.fields().len() {
1060            return false;
1061        }
1062
1063        self.fields()
1064            .iter()
1065            .zip(other.fields().iter())
1066            .all(|(f1, f2)| {
1067                f1.name() == f2.name()
1068                    && DFSchema::datatype_is_semantically_equal(
1069                        f1.data_type(),
1070                        f2.data_type(),
1071                    )
1072            })
1073    }
1074
1075    // It is only used by insert into cases.
1076    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1077        // case 1 : schema length mismatch
1078        if self.fields().len() != other.fields().len() {
1079            _plan_err!(
1080                "Inserting query must have the same schema length as the table. \
1081            Expected table schema length: {}, got: {}",
1082                self.fields().len(),
1083                other.fields().len()
1084            )
1085        } else {
1086            // case 2 : schema length match, but fields mismatch
1087            // check if the fields name are the same and have the same data types
1088            self.fields()
1089                .iter()
1090                .zip(other.fields().iter())
1091                .try_for_each(|(f1, f2)| {
1092                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1093                        _plan_err!(
1094                            "Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
1095                            but got '{}' with type {:?}.",
1096                            f1.name(),
1097                            f1.data_type(),
1098                            f2.name(),
1099                            f2.data_type())
1100                    } else {
1101                        Ok(())
1102                    }
1103                })
1104        }
1105    }
1106}
1107
1108pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1109    match qualifier {
1110        Some(q) => format!("{q}.{name}"),
1111        None => name.to_string(),
1112    }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117    use crate::assert_contains;
1118
1119    use super::*;
1120
1121    #[test]
1122    fn qualifier_in_name() -> Result<()> {
1123        let col = Column::from_name("t1.c0");
1124        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1125        // lookup with unqualified name "t1.c0"
1126        let err = schema.index_of_column(&col).unwrap_err();
1127        let expected = "Schema error: No field named \"t1.c0\". \
1128            Column names are case sensitive. \
1129            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1130            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1131            Did you mean 't1.c0'?.";
1132        assert_eq!(err.strip_backtrace(), expected);
1133        Ok(())
1134    }
1135
1136    #[test]
1137    fn quoted_qualifiers_in_name() -> Result<()> {
1138        let col = Column::from_name("t1.c0");
1139        let schema = DFSchema::try_from_qualified_schema(
1140            "t1",
1141            &Schema::new(vec![
1142                Field::new("CapitalColumn", DataType::Boolean, true),
1143                Field::new("field.with.period", DataType::Boolean, true),
1144            ]),
1145        )?;
1146
1147        // lookup with unqualified name "t1.c0"
1148        let err = schema.index_of_column(&col).unwrap_err();
1149        let expected = "Schema error: No field named \"t1.c0\". \
1150            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1151        assert_eq!(err.strip_backtrace(), expected);
1152        Ok(())
1153    }
1154
1155    #[test]
1156    fn from_unqualified_schema() -> Result<()> {
1157        let schema = DFSchema::try_from(test_schema_1())?;
1158        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1159        Ok(())
1160    }
1161
1162    #[test]
1163    fn from_qualified_schema() -> Result<()> {
1164        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1165        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1166        Ok(())
1167    }
1168
1169    #[test]
1170    fn test_from_field_specific_qualified_schema() -> Result<()> {
1171        let schema = DFSchema::from_field_specific_qualified_schema(
1172            vec![Some("t1".into()), None],
1173            &Arc::new(Schema::new(vec![
1174                Field::new("c0", DataType::Boolean, true),
1175                Field::new("c1", DataType::Boolean, true),
1176            ])),
1177        )?;
1178        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1179        Ok(())
1180    }
1181
1182    #[test]
1183    fn test_from_qualified_fields() -> Result<()> {
1184        let schema = DFSchema::new_with_metadata(
1185            vec![
1186                (
1187                    Some("t0".into()),
1188                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1189                ),
1190                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1191            ],
1192            HashMap::new(),
1193        )?;
1194        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1195        Ok(())
1196    }
1197
1198    #[test]
1199    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1200        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1201        let arrow_schema: Schema = schema.into();
1202        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
1203        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
1204        assert_eq!(expected, arrow_schema.to_string());
1205        Ok(())
1206    }
1207
1208    #[test]
1209    fn join_qualified() -> Result<()> {
1210        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1211        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1212        let join = left.join(&right)?;
1213        assert_eq!(
1214            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1215            join.to_string()
1216        );
1217        // test valid access
1218        assert!(join
1219            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1220            .is_ok());
1221        assert!(join
1222            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1223            .is_ok());
1224        // test invalid access
1225        assert!(join.field_with_unqualified_name("c0").is_err());
1226        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1227        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1228        Ok(())
1229    }
1230
1231    #[test]
1232    fn join_qualified_duplicate() -> Result<()> {
1233        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1234        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1235        let join = left.join(&right);
1236        assert_eq!(
1237            join.unwrap_err().strip_backtrace(),
1238            "Schema error: Schema contains duplicate qualified field name t1.c0",
1239        );
1240        Ok(())
1241    }
1242
1243    #[test]
1244    fn join_unqualified_duplicate() -> Result<()> {
1245        let left = DFSchema::try_from(test_schema_1())?;
1246        let right = DFSchema::try_from(test_schema_1())?;
1247        let join = left.join(&right);
1248        assert_eq!(
1249            join.unwrap_err().strip_backtrace(),
1250            "Schema error: Schema contains duplicate unqualified field name c0"
1251        );
1252        Ok(())
1253    }
1254
1255    #[test]
1256    fn join_mixed() -> Result<()> {
1257        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1258        let right = DFSchema::try_from(test_schema_2())?;
1259        let join = left.join(&right)?;
1260        assert_eq!(
1261            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1262            join.to_string()
1263        );
1264        // test valid access
1265        assert!(join
1266            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1267            .is_ok());
1268        assert!(join.field_with_unqualified_name("c0").is_ok());
1269        assert!(join.field_with_unqualified_name("c100").is_ok());
1270        assert!(join.field_with_name(None, "c100").is_ok());
1271        // test invalid access
1272        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1273        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1274        assert!(join
1275            .field_with_qualified_name(&TableReference::bare(""), "c100")
1276            .is_err());
1277        Ok(())
1278    }
1279
1280    #[test]
1281    fn join_mixed_duplicate() -> Result<()> {
1282        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1283        let right = DFSchema::try_from(test_schema_1())?;
1284        let join = left.join(&right);
1285        assert_contains!(join.unwrap_err().to_string(),
1286                         "Schema error: Schema contains qualified \
1287                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1288        Ok(())
1289    }
1290
1291    #[test]
1292    fn helpful_error_messages() -> Result<()> {
1293        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1294        let expected_help = "Valid fields are t1.c0, t1.c1.";
1295        assert_contains!(
1296            schema
1297                .field_with_qualified_name(&TableReference::bare("x"), "y")
1298                .unwrap_err()
1299                .to_string(),
1300            expected_help
1301        );
1302        assert_contains!(
1303            schema
1304                .field_with_unqualified_name("y")
1305                .unwrap_err()
1306                .to_string(),
1307            expected_help
1308        );
1309        assert!(schema.index_of_column_by_name(None, "y").is_none());
1310        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1311
1312        Ok(())
1313    }
1314
1315    #[test]
1316    fn select_without_valid_fields() {
1317        let schema = DFSchema::empty();
1318
1319        let col = Column::from_qualified_name("t1.c0");
1320        let err = schema.index_of_column(&col).unwrap_err();
1321        let expected = "Schema error: No field named t1.c0.";
1322        assert_eq!(err.strip_backtrace(), expected);
1323
1324        // the same check without qualifier
1325        let col = Column::from_name("c0");
1326        let err = schema.index_of_column(&col).err().unwrap();
1327        let expected = "Schema error: No field named c0.";
1328        assert_eq!(err.strip_backtrace(), expected);
1329    }
1330
1331    #[test]
1332    fn into() {
1333        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1334        let arrow_schema = Schema::new_with_metadata(
1335            vec![Field::new("c0", DataType::Int64, true)],
1336            test_metadata(),
1337        );
1338        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1339
1340        let df_schema = DFSchema {
1341            inner: Arc::clone(&arrow_schema_ref),
1342            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1343            functional_dependencies: FunctionalDependencies::empty(),
1344        };
1345        let df_schema_ref = Arc::new(df_schema.clone());
1346
1347        {
1348            let arrow_schema = arrow_schema.clone();
1349            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1350
1351            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1352            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1353        }
1354
1355        {
1356            let arrow_schema = arrow_schema.clone();
1357            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1358
1359            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1360            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1361        }
1362
1363        // Now, consume the refs
1364        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1365        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1366    }
1367
1368    fn test_schema_1() -> Schema {
1369        Schema::new(vec![
1370            Field::new("c0", DataType::Boolean, true),
1371            Field::new("c1", DataType::Boolean, true),
1372        ])
1373    }
1374    #[test]
1375    fn test_dfschema_to_schema_conversion() {
1376        let mut a_metadata = HashMap::new();
1377        a_metadata.insert("key".to_string(), "value".to_string());
1378        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1379
1380        let mut b_metadata = HashMap::new();
1381        b_metadata.insert("key".to_string(), "value".to_string());
1382        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1383
1384        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1385
1386        let df_schema = DFSchema {
1387            inner: Arc::clone(&schema),
1388            field_qualifiers: vec![None; schema.fields.len()],
1389            functional_dependencies: FunctionalDependencies::empty(),
1390        };
1391
1392        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1393    }
1394
1395    #[test]
1396    fn test_contain_column() -> Result<()> {
1397        // qualified exists
1398        {
1399            let col = Column::from_qualified_name("t1.c0");
1400            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1401            assert!(schema.is_column_from_schema(&col));
1402        }
1403
1404        // qualified not exists
1405        {
1406            let col = Column::from_qualified_name("t1.c2");
1407            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1408            assert!(!schema.is_column_from_schema(&col));
1409        }
1410
1411        // unqualified exists
1412        {
1413            let col = Column::from_name("c0");
1414            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1415            assert!(schema.is_column_from_schema(&col));
1416        }
1417
1418        // unqualified not exists
1419        {
1420            let col = Column::from_name("c2");
1421            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1422            assert!(!schema.is_column_from_schema(&col));
1423        }
1424
1425        Ok(())
1426    }
1427
1428    #[test]
1429    fn test_datatype_is_logically_equal() {
1430        assert!(DFSchema::datatype_is_logically_equal(
1431            &DataType::Int8,
1432            &DataType::Int8
1433        ));
1434
1435        assert!(!DFSchema::datatype_is_logically_equal(
1436            &DataType::Int8,
1437            &DataType::Int16
1438        ));
1439
1440        // Test lists
1441
1442        // Succeeds if both have the same element type, disregards names and nullability
1443        assert!(DFSchema::datatype_is_logically_equal(
1444            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1445            &DataType::List(Field::new("element", DataType::Int8, false).into())
1446        ));
1447
1448        // Fails if element type is different
1449        assert!(!DFSchema::datatype_is_logically_equal(
1450            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1451            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1452        ));
1453
1454        // Test maps
1455        let map_field = DataType::Map(
1456            Field::new(
1457                "entries",
1458                DataType::Struct(Fields::from(vec![
1459                    Field::new("key", DataType::Int8, false),
1460                    Field::new("value", DataType::Int8, true),
1461                ])),
1462                true,
1463            )
1464            .into(),
1465            true,
1466        );
1467
1468        // Succeeds if both maps have the same key and value types, disregards names and nullability
1469        assert!(DFSchema::datatype_is_logically_equal(
1470            &map_field,
1471            &DataType::Map(
1472                Field::new(
1473                    "pairs",
1474                    DataType::Struct(Fields::from(vec![
1475                        Field::new("one", DataType::Int8, false),
1476                        Field::new("two", DataType::Int8, false)
1477                    ])),
1478                    true
1479                )
1480                .into(),
1481                true
1482            )
1483        ));
1484        // Fails if value type is different
1485        assert!(!DFSchema::datatype_is_logically_equal(
1486            &map_field,
1487            &DataType::Map(
1488                Field::new(
1489                    "entries",
1490                    DataType::Struct(Fields::from(vec![
1491                        Field::new("key", DataType::Int8, false),
1492                        Field::new("value", DataType::Int16, true)
1493                    ])),
1494                    true
1495                )
1496                .into(),
1497                true
1498            )
1499        ));
1500
1501        // Fails if key type is different
1502        assert!(!DFSchema::datatype_is_logically_equal(
1503            &map_field,
1504            &DataType::Map(
1505                Field::new(
1506                    "entries",
1507                    DataType::Struct(Fields::from(vec![
1508                        Field::new("key", DataType::Int16, false),
1509                        Field::new("value", DataType::Int8, true)
1510                    ])),
1511                    true
1512                )
1513                .into(),
1514                true
1515            )
1516        ));
1517
1518        // Test structs
1519
1520        let struct_field = DataType::Struct(Fields::from(vec![
1521            Field::new("a", DataType::Int8, true),
1522            Field::new("b", DataType::Int8, true),
1523        ]));
1524
1525        // Succeeds if both have same names and datatypes, ignores nullability
1526        assert!(DFSchema::datatype_is_logically_equal(
1527            &struct_field,
1528            &DataType::Struct(Fields::from(vec![
1529                Field::new("a", DataType::Int8, false),
1530                Field::new("b", DataType::Int8, true),
1531            ]))
1532        ));
1533
1534        // Fails if field names are different
1535        assert!(!DFSchema::datatype_is_logically_equal(
1536            &struct_field,
1537            &DataType::Struct(Fields::from(vec![
1538                Field::new("x", DataType::Int8, true),
1539                Field::new("y", DataType::Int8, true),
1540            ]))
1541        ));
1542
1543        // Fails if types are different
1544        assert!(!DFSchema::datatype_is_logically_equal(
1545            &struct_field,
1546            &DataType::Struct(Fields::from(vec![
1547                Field::new("a", DataType::Int16, true),
1548                Field::new("b", DataType::Int8, true),
1549            ]))
1550        ));
1551
1552        // Fails if more or less fields
1553        assert!(!DFSchema::datatype_is_logically_equal(
1554            &struct_field,
1555            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1556        ));
1557    }
1558
1559    #[test]
1560    fn test_datatype_is_logically_equivalent_to_dictionary() {
1561        // Dictionary is logically equal to its value type
1562        assert!(DFSchema::datatype_is_logically_equal(
1563            &DataType::Utf8,
1564            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1565        ));
1566    }
1567
1568    #[test]
1569    fn test_datatype_is_semantically_equal() {
1570        assert!(DFSchema::datatype_is_semantically_equal(
1571            &DataType::Int8,
1572            &DataType::Int8
1573        ));
1574
1575        assert!(!DFSchema::datatype_is_semantically_equal(
1576            &DataType::Int8,
1577            &DataType::Int16
1578        ));
1579
1580        // Test lists
1581
1582        // Succeeds if both have the same element type, disregards names and nullability
1583        assert!(DFSchema::datatype_is_semantically_equal(
1584            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1585            &DataType::List(Field::new("element", DataType::Int8, false).into())
1586        ));
1587
1588        // Fails if element type is different
1589        assert!(!DFSchema::datatype_is_semantically_equal(
1590            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1591            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1592        ));
1593
1594        // Test maps
1595        let map_field = DataType::Map(
1596            Field::new(
1597                "entries",
1598                DataType::Struct(Fields::from(vec![
1599                    Field::new("key", DataType::Int8, false),
1600                    Field::new("value", DataType::Int8, true),
1601                ])),
1602                true,
1603            )
1604            .into(),
1605            true,
1606        );
1607
1608        // Succeeds if both maps have the same key and value types, disregards names and nullability
1609        assert!(DFSchema::datatype_is_semantically_equal(
1610            &map_field,
1611            &DataType::Map(
1612                Field::new(
1613                    "pairs",
1614                    DataType::Struct(Fields::from(vec![
1615                        Field::new("one", DataType::Int8, false),
1616                        Field::new("two", DataType::Int8, false)
1617                    ])),
1618                    true
1619                )
1620                .into(),
1621                true
1622            )
1623        ));
1624        // Fails if value type is different
1625        assert!(!DFSchema::datatype_is_semantically_equal(
1626            &map_field,
1627            &DataType::Map(
1628                Field::new(
1629                    "entries",
1630                    DataType::Struct(Fields::from(vec![
1631                        Field::new("key", DataType::Int8, false),
1632                        Field::new("value", DataType::Int16, true)
1633                    ])),
1634                    true
1635                )
1636                .into(),
1637                true
1638            )
1639        ));
1640
1641        // Fails if key type is different
1642        assert!(!DFSchema::datatype_is_semantically_equal(
1643            &map_field,
1644            &DataType::Map(
1645                Field::new(
1646                    "entries",
1647                    DataType::Struct(Fields::from(vec![
1648                        Field::new("key", DataType::Int16, false),
1649                        Field::new("value", DataType::Int8, true)
1650                    ])),
1651                    true
1652                )
1653                .into(),
1654                true
1655            )
1656        ));
1657
1658        // Test structs
1659
1660        let struct_field = DataType::Struct(Fields::from(vec![
1661            Field::new("a", DataType::Int8, true),
1662            Field::new("b", DataType::Int8, true),
1663        ]));
1664
1665        // Succeeds if both have same names and datatypes, ignores nullability
1666        assert!(DFSchema::datatype_is_logically_equal(
1667            &struct_field,
1668            &DataType::Struct(Fields::from(vec![
1669                Field::new("a", DataType::Int8, false),
1670                Field::new("b", DataType::Int8, true),
1671            ]))
1672        ));
1673
1674        // Fails if field names are different
1675        assert!(!DFSchema::datatype_is_logically_equal(
1676            &struct_field,
1677            &DataType::Struct(Fields::from(vec![
1678                Field::new("x", DataType::Int8, true),
1679                Field::new("y", DataType::Int8, true),
1680            ]))
1681        ));
1682
1683        // Fails if types are different
1684        assert!(!DFSchema::datatype_is_logically_equal(
1685            &struct_field,
1686            &DataType::Struct(Fields::from(vec![
1687                Field::new("a", DataType::Int16, true),
1688                Field::new("b", DataType::Int8, true),
1689            ]))
1690        ));
1691
1692        // Fails if more or less fields
1693        assert!(!DFSchema::datatype_is_logically_equal(
1694            &struct_field,
1695            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1696        ));
1697    }
1698
1699    #[test]
1700    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1701        // Dictionary is not semantically equal to its value type
1702        assert!(!DFSchema::datatype_is_semantically_equal(
1703            &DataType::Utf8,
1704            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1705        ));
1706    }
1707
1708    fn test_schema_2() -> Schema {
1709        Schema::new(vec![
1710            Field::new("c100", DataType::Boolean, true),
1711            Field::new("c101", DataType::Boolean, true),
1712        ])
1713    }
1714
1715    fn test_metadata() -> HashMap<String, String> {
1716        test_metadata_n(2)
1717    }
1718
1719    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1720        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1721    }
1722}