datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use datafusion_common::{DFSchema, Column};
60/// use arrow::datatypes::{DataType, Field, Schema};
61///
62/// let arrow_schema = Schema::new(vec![
63///    Field::new("c1", DataType::Int32, false),
64/// ]);
65///
66/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
67/// let column = Column::from_qualified_name("t1.c1");
68/// assert!(df_schema.has_column(&column));
69///
70/// // Can also access qualified fields with unqualified name, if it's unambiguous
71/// let column = Column::from_qualified_name("c1");
72/// assert!(df_schema.has_column(&column));
73/// ```
74///
75/// # Creating unqualified schemas
76///
77/// Create an unqualified schema using TryFrom:
78///
79/// ```rust
80/// use datafusion_common::{DFSchema, Column};
81/// use arrow::datatypes::{DataType, Field, Schema};
82///
83/// let arrow_schema = Schema::new(vec![
84///    Field::new("c1", DataType::Int32, false),
85/// ]);
86///
87/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
88/// let column = Column::new_unqualified("c1");
89/// assert!(df_schema.has_column(&column));
90/// ```
91///
92/// # Converting back to Arrow schema
93///
94/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
95///
96/// ```rust
97/// use datafusion_common::DFSchema;
98/// use arrow::datatypes::{Schema, Field};
99/// use std::collections::HashMap;
100///
101/// let df_schema = DFSchema::from_unqualified_fields(vec![
102///    Field::new("c1", arrow::datatypes::DataType::Int32, false),
103/// ].into(),HashMap::new()).unwrap();
104/// let schema = Schema::from(df_schema);
105/// assert_eq!(schema.fields().len(), 1);
106/// ```
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DFSchema {
109    /// Inner Arrow schema reference.
110    inner: SchemaRef,
111    /// Optional qualifiers for each column in this schema. In the same order as
112    /// the `self.inner.fields()`
113    field_qualifiers: Vec<Option<TableReference>>,
114    /// Stores functional dependencies in the schema.
115    functional_dependencies: FunctionalDependencies,
116}
117
118impl DFSchema {
119    /// Creates an empty `DFSchema`
120    pub fn empty() -> Self {
121        Self {
122            inner: Arc::new(Schema::new([])),
123            field_qualifiers: vec![],
124            functional_dependencies: FunctionalDependencies::empty(),
125        }
126    }
127
128    /// Return a reference to the inner Arrow [`Schema`]
129    ///
130    /// Note this does not have the qualifier information
131    pub fn as_arrow(&self) -> &Schema {
132        self.inner.as_ref()
133    }
134
135    /// Return a reference to the inner Arrow [`SchemaRef`]
136    ///
137    /// Note this does not have the qualifier information
138    pub fn inner(&self) -> &SchemaRef {
139        &self.inner
140    }
141
142    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
143    pub fn new_with_metadata(
144        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145        metadata: HashMap<String, String>,
146    ) -> Result<Self> {
147        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
148            qualified_fields.into_iter().unzip();
149
150        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
151
152        let dfschema = Self {
153            inner: schema,
154            field_qualifiers: qualifiers,
155            functional_dependencies: FunctionalDependencies::empty(),
156        };
157        dfschema.check_names()?;
158        Ok(dfschema)
159    }
160
161    /// Create a new `DFSchema` from a list of Arrow [Field]s
162    pub fn from_unqualified_fields(
163        fields: Fields,
164        metadata: HashMap<String, String>,
165    ) -> Result<Self> {
166        let field_count = fields.len();
167        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
168        let dfschema = Self {
169            inner: schema,
170            field_qualifiers: vec![None; field_count],
171            functional_dependencies: FunctionalDependencies::empty(),
172        };
173        dfschema.check_names()?;
174        Ok(dfschema)
175    }
176
177    /// Create a `DFSchema` from an Arrow schema and a given qualifier
178    ///
179    /// To create a schema from an Arrow schema without a qualifier, use
180    /// `DFSchema::try_from`.
181    pub fn try_from_qualified_schema(
182        qualifier: impl Into<TableReference>,
183        schema: &Schema,
184    ) -> Result<Self> {
185        let qualifier = qualifier.into();
186        let schema = DFSchema {
187            inner: schema.clone().into(),
188            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
189            functional_dependencies: FunctionalDependencies::empty(),
190        };
191        schema.check_names()?;
192        Ok(schema)
193    }
194
195    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
196    pub fn from_field_specific_qualified_schema(
197        qualifiers: Vec<Option<TableReference>>,
198        schema: &SchemaRef,
199    ) -> Result<Self> {
200        let dfschema = Self {
201            inner: Arc::clone(schema),
202            field_qualifiers: qualifiers,
203            functional_dependencies: FunctionalDependencies::empty(),
204        };
205        dfschema.check_names()?;
206        Ok(dfschema)
207    }
208
209    /// Return the same schema, where all fields have a given qualifier.
210    pub fn with_field_specific_qualified_schema(
211        &self,
212        qualifiers: Vec<Option<TableReference>>,
213    ) -> Result<Self> {
214        if qualifiers.len() != self.fields().len() {
215            return _plan_err!(
216                "Number of qualifiers must match number of fields. Expected {}, got {}",
217                self.fields().len(),
218                qualifiers.len()
219            );
220        }
221        Ok(DFSchema {
222            inner: Arc::clone(&self.inner),
223            field_qualifiers: qualifiers,
224            functional_dependencies: self.functional_dependencies.clone(),
225        })
226    }
227
228    /// Check if the schema have some fields with the same name
229    pub fn check_names(&self) -> Result<()> {
230        let mut qualified_names = BTreeSet::new();
231        let mut unqualified_names = BTreeSet::new();
232
233        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
234            if let Some(qualifier) = qualifier {
235                if !qualified_names.insert((qualifier, field.name())) {
236                    return _schema_err!(SchemaError::DuplicateQualifiedField {
237                        qualifier: Box::new(qualifier.clone()),
238                        name: field.name().to_string(),
239                    });
240                }
241            } else if !unqualified_names.insert(field.name()) {
242                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
243                    name: field.name().to_string()
244                });
245            }
246        }
247
248        for (qualifier, name) in qualified_names {
249            if unqualified_names.contains(name) {
250                return _schema_err!(SchemaError::AmbiguousReference {
251                    field: Box::new(Column::new(Some(qualifier.clone()), name))
252                });
253            }
254        }
255        Ok(())
256    }
257
258    /// Assigns functional dependencies.
259    pub fn with_functional_dependencies(
260        mut self,
261        functional_dependencies: FunctionalDependencies,
262    ) -> Result<Self> {
263        if functional_dependencies.is_valid(self.inner.fields.len()) {
264            self.functional_dependencies = functional_dependencies;
265            Ok(self)
266        } else {
267            _plan_err!(
268                "Invalid functional dependency: {:?}",
269                functional_dependencies
270            )
271        }
272    }
273
274    /// Create a new schema that contains the fields from this schema followed by the fields
275    /// from the supplied schema. An error will be returned if there are duplicate field names.
276    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
277        let mut schema_builder = SchemaBuilder::new();
278        schema_builder.extend(self.inner.fields().iter().cloned());
279        schema_builder.extend(schema.fields().iter().cloned());
280        let new_schema = schema_builder.finish();
281
282        let mut new_metadata = self.inner.metadata.clone();
283        new_metadata.extend(schema.inner.metadata.clone());
284        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
285
286        let mut new_qualifiers = self.field_qualifiers.clone();
287        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
288
289        let new_self = Self {
290            inner: Arc::new(new_schema_with_metadata),
291            field_qualifiers: new_qualifiers,
292            functional_dependencies: FunctionalDependencies::empty(),
293        };
294        new_self.check_names()?;
295        Ok(new_self)
296    }
297
298    /// Modify this schema by appending the fields from the supplied schema, ignoring any
299    /// duplicate fields.
300    ///
301    /// ## Merge Precedence
302    ///
303    /// **Schema-level metadata**: Metadata from both schemas is merged.
304    /// If both schemas have the same metadata key, the value from the `other_schema` parameter takes precedence.
305    ///
306    /// **Field-level merging**: Only non-duplicate fields are added. This means that the
307    /// `self` fields will always take precedence over the `other_schema` fields.
308    /// Duplicate field detection is based on:
309    /// - For qualified fields: both qualifier and field name must match
310    /// - For unqualified fields: only field name needs to match
311    ///
312    /// Take note how the precedence for fields & metadata merging differs;
313    /// merging prefers fields from `self` but prefers metadata from `other_schema`.
314    pub fn merge(&mut self, other_schema: &DFSchema) {
315        if other_schema.inner.fields.is_empty() {
316            return;
317        }
318
319        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
320            self.iter().collect();
321        let self_unqualified_names: HashSet<&str> = self
322            .inner
323            .fields
324            .iter()
325            .map(|field| field.name().as_str())
326            .collect();
327
328        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
329        let mut qualifiers = Vec::new();
330        for (qualifier, field) in other_schema.iter() {
331            // skip duplicate columns
332            let duplicated_field = match qualifier {
333                Some(q) => self_fields.contains(&(Some(q), field)),
334                // for unqualified columns, check as unqualified name
335                None => self_unqualified_names.contains(field.name().as_str()),
336            };
337            if !duplicated_field {
338                schema_builder.push(Arc::clone(field));
339                qualifiers.push(qualifier.cloned());
340            }
341        }
342        let mut metadata = self.inner.metadata.clone();
343        metadata.extend(other_schema.inner.metadata.clone());
344
345        let finished = schema_builder.finish();
346        let finished_with_metadata = finished.with_metadata(metadata);
347        self.inner = finished_with_metadata.into();
348        self.field_qualifiers.extend(qualifiers);
349    }
350
351    /// Get a list of fields
352    pub fn fields(&self) -> &Fields {
353        &self.inner.fields
354    }
355
356    /// Returns an immutable reference of a specific `Field` instance selected using an
357    /// offset within the internal `fields` vector
358    pub fn field(&self, i: usize) -> &Field {
359        &self.inner.fields[i]
360    }
361
362    /// Returns an immutable reference of a specific `Field` instance selected using an
363    /// offset within the internal `fields` vector and its qualifier
364    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
365        (self.field_qualifiers[i].as_ref(), self.field(i))
366    }
367
368    pub fn index_of_column_by_name(
369        &self,
370        qualifier: Option<&TableReference>,
371        name: &str,
372    ) -> Option<usize> {
373        let mut matches = self
374            .iter()
375            .enumerate()
376            .filter(|(_, (q, f))| match (qualifier, q) {
377                // field to lookup is qualified.
378                // current field is qualified and not shared between relations, compare both
379                // qualifier and name.
380                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
381                // field to lookup is qualified but current field is unqualified.
382                (Some(_), None) => false,
383                // field to lookup is unqualified, no need to compare qualifier
384                (None, Some(_)) | (None, None) => f.name() == name,
385            })
386            .map(|(idx, _)| idx);
387        matches.next()
388    }
389
390    /// Find the index of the column with the given qualifier and name,
391    /// returning `None` if not found
392    ///
393    /// See [Self::index_of_column] for a version that returns an error if the
394    /// column is not found
395    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
396        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
397    }
398
399    /// Find the index of the column with the given qualifier and name,
400    /// returning `Err` if not found
401    ///
402    /// See [Self::maybe_index_of_column] for a version that returns `None` if
403    /// the column is not found
404    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
405        self.maybe_index_of_column(col)
406            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
407    }
408
409    /// Check if the column is in the current schema
410    pub fn is_column_from_schema(&self, col: &Column) -> bool {
411        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
412            .is_some()
413    }
414
415    /// Find the field with the given name
416    pub fn field_with_name(
417        &self,
418        qualifier: Option<&TableReference>,
419        name: &str,
420    ) -> Result<&Field> {
421        if let Some(qualifier) = qualifier {
422            self.field_with_qualified_name(qualifier, name)
423        } else {
424            self.field_with_unqualified_name(name)
425        }
426    }
427
428    /// Find the qualified field with the given name
429    pub fn qualified_field_with_name(
430        &self,
431        qualifier: Option<&TableReference>,
432        name: &str,
433    ) -> Result<(Option<&TableReference>, &Field)> {
434        if let Some(qualifier) = qualifier {
435            let idx = self
436                .index_of_column_by_name(Some(qualifier), name)
437                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
438            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
439        } else {
440            self.qualified_field_with_unqualified_name(name)
441        }
442    }
443
444    /// Find all fields having the given qualifier
445    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
446        self.iter()
447            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
448            .map(|(_, f)| f.as_ref())
449            .collect()
450    }
451
452    /// Find all fields indices having the given qualifier
453    pub fn fields_indices_with_qualified(
454        &self,
455        qualifier: &TableReference,
456    ) -> Vec<usize> {
457        self.iter()
458            .enumerate()
459            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
460            .collect()
461    }
462
463    /// Find all fields that match the given name
464    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
465        self.fields()
466            .iter()
467            .filter(|field| field.name() == name)
468            .map(|f| f.as_ref())
469            .collect()
470    }
471
472    /// Find all fields that match the given name and return them with their qualifier
473    pub fn qualified_fields_with_unqualified_name(
474        &self,
475        name: &str,
476    ) -> Vec<(Option<&TableReference>, &Field)> {
477        self.iter()
478            .filter(|(_, field)| field.name() == name)
479            .map(|(qualifier, field)| (qualifier, field.as_ref()))
480            .collect()
481    }
482
483    /// Find all fields that match the given name and convert to column
484    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
485        self.iter()
486            .filter(|(_, field)| field.name() == name)
487            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
488            .collect()
489    }
490
491    /// Return all `Column`s for the schema
492    pub fn columns(&self) -> Vec<Column> {
493        self.iter()
494            .map(|(qualifier, field)| {
495                Column::new(qualifier.cloned(), field.name().clone())
496            })
497            .collect()
498    }
499
500    /// Find the qualified field with the given unqualified name
501    pub fn qualified_field_with_unqualified_name(
502        &self,
503        name: &str,
504    ) -> Result<(Option<&TableReference>, &Field)> {
505        let matches = self.qualified_fields_with_unqualified_name(name);
506        match matches.len() {
507            0 => Err(unqualified_field_not_found(name, self)),
508            1 => Ok((matches[0].0, matches[0].1)),
509            _ => {
510                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
511                // Because name may generate from Alias/... . It means that it don't own qualifier.
512                // For example:
513                //             Join on id = b.id
514                // Project a.id as id   TableScan b id
515                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
516                // one field without qualifier, we should return it.
517                let fields_without_qualifier = matches
518                    .iter()
519                    .filter(|(q, _)| q.is_none())
520                    .collect::<Vec<_>>();
521                if fields_without_qualifier.len() == 1 {
522                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
523                } else {
524                    _schema_err!(SchemaError::AmbiguousReference {
525                        field: Box::new(Column::new_unqualified(name.to_string()))
526                    })
527                }
528            }
529        }
530    }
531
532    /// Find the field with the given name
533    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
534        self.qualified_field_with_unqualified_name(name)
535            .map(|(_, field)| field)
536    }
537
538    /// Find the field with the given qualified name
539    pub fn field_with_qualified_name(
540        &self,
541        qualifier: &TableReference,
542        name: &str,
543    ) -> Result<&Field> {
544        let idx = self
545            .index_of_column_by_name(Some(qualifier), name)
546            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
547
548        Ok(self.field(idx))
549    }
550
551    /// Find the field with the given qualified column
552    pub fn qualified_field_from_column(
553        &self,
554        column: &Column,
555    ) -> Result<(Option<&TableReference>, &Field)> {
556        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
557    }
558
559    /// Find if the field exists with the given name
560    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
561        self.fields().iter().any(|field| field.name() == name)
562    }
563
564    /// Find if the field exists with the given qualified name
565    pub fn has_column_with_qualified_name(
566        &self,
567        qualifier: &TableReference,
568        name: &str,
569    ) -> bool {
570        self.iter()
571            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
572    }
573
574    /// Find if the field exists with the given qualified column
575    pub fn has_column(&self, column: &Column) -> bool {
576        match &column.relation {
577            Some(r) => self.has_column_with_qualified_name(r, &column.name),
578            None => self.has_column_with_unqualified_name(&column.name),
579        }
580    }
581
582    /// Check to see if unqualified field names matches field names in Arrow schema
583    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
584        self.inner
585            .fields
586            .iter()
587            .zip(arrow_schema.fields().iter())
588            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
589    }
590
591    /// Check to see if fields in 2 Arrow schemas are compatible
592    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
593    pub fn check_arrow_schema_type_compatible(
594        &self,
595        arrow_schema: &Schema,
596    ) -> Result<()> {
597        let self_arrow_schema: Schema = self.into();
598        self_arrow_schema
599            .fields()
600            .iter()
601            .zip(arrow_schema.fields().iter())
602            .try_for_each(|(l_field, r_field)| {
603                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
604                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
605                                r_field.name(),
606                                r_field.data_type(),
607                                l_field.name(),
608                                l_field.data_type())
609                } else {
610                    Ok(())
611                }
612            })
613    }
614
615    /// Returns true if the two schemas have the same qualified named
616    /// fields with logically equivalent data types. Returns false otherwise.
617    ///
618    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
619    /// equivalence checking.
620    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
621        if self.fields().len() != other.fields().len() {
622            return false;
623        }
624        let self_fields = self.iter();
625        let other_fields = other.iter();
626        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
627            q1 == q2
628                && f1.name() == f2.name()
629                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
630        })
631    }
632
633    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
634    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
635        self.has_equivalent_names_and_types(other).is_ok()
636    }
637
638    /// Returns Ok if the two schemas have the same qualified named
639    /// fields with the compatible data types.
640    ///
641    /// Returns an `Err` with a message otherwise.
642    ///
643    /// This is a specialized version of Eq that ignores differences in
644    /// nullability and metadata.
645    ///
646    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
647    /// logical type checking, which for example would consider a dictionary
648    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
649    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
650        // case 1 : schema length mismatch
651        if self.fields().len() != other.fields().len() {
652            _plan_err!(
653                "Schema mismatch: the schema length are not same \
654            Expected schema length: {}, got: {}",
655                self.fields().len(),
656                other.fields().len()
657            )
658        } else {
659            // case 2 : schema length match, but fields mismatch
660            // check if the fields name are the same and have the same data types
661            self.fields()
662                .iter()
663                .zip(other.fields().iter())
664                .try_for_each(|(f1, f2)| {
665                    if f1.name() != f2.name()
666                        || (!DFSchema::datatype_is_semantically_equal(
667                            f1.data_type(),
668                            f2.data_type(),
669                        ))
670                    {
671                        _plan_err!(
672                            "Schema mismatch: Expected field '{}' with type {:?}, \
673                            but got '{}' with type {:?}.",
674                            f1.name(),
675                            f1.data_type(),
676                            f2.name(),
677                            f2.data_type()
678                        )
679                    } else {
680                        Ok(())
681                    }
682                })
683        }
684    }
685
686    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
687    /// than datatype_is_semantically_equal in that different representations of same data can be
688    /// logically but not semantically equivalent. Semantically equivalent types are always also
689    /// logically equivalent. For example:
690    /// - a Dictionary<K,V> type is logically equal to a plain V type
691    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
692    /// - Utf8 and Utf8View are logically equal
693    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
694        // check nested fields
695        match (dt1, dt2) {
696            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
697                v1.as_ref() == v2.as_ref()
698            }
699            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
700            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
701            (DataType::List(f1), DataType::List(f2))
702            | (DataType::LargeList(f1), DataType::LargeList(f2))
703            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
704                // Don't compare the names of the technical inner field
705                // Usually "item" but that's not mandated
706                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
707            }
708            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
709                // Don't compare the names of the technical inner fields
710                // Usually "entries", "key", "value" but that's not mandated
711                match (f1.data_type(), f2.data_type()) {
712                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
713                        f1_inner.len() == f2_inner.len()
714                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
715                                Self::datatype_is_logically_equal(
716                                    f1.data_type(),
717                                    f2.data_type(),
718                                )
719                            })
720                    }
721                    _ => panic!("Map type should have an inner struct field"),
722                }
723            }
724            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
725                let iter1 = fields1.iter();
726                let iter2 = fields2.iter();
727                fields1.len() == fields2.len() &&
728                        // all fields have to be the same
729                    iter1
730                    .zip(iter2)
731                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
732            }
733            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
734                let iter1 = fields1.iter();
735                let iter2 = fields2.iter();
736                fields1.len() == fields2.len() &&
737                    // all fields have to be the same
738                    iter1
739                        .zip(iter2)
740                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
741            }
742            // Utf8 and Utf8View are logically equivalent
743            (DataType::Utf8, DataType::Utf8View) => true,
744            (DataType::Utf8View, DataType::Utf8) => true,
745            _ => Self::datatype_is_semantically_equal(dt1, dt2),
746        }
747    }
748
749    /// Returns true of two [`DataType`]s are semantically equal (same
750    /// name and type), ignoring both metadata and nullability, decimal precision/scale,
751    /// and timezone time units/timezones.
752    ///
753    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
754    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
755        // check nested fields
756        match (dt1, dt2) {
757            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
758                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
759                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
760            }
761            (DataType::List(f1), DataType::List(f2))
762            | (DataType::LargeList(f1), DataType::LargeList(f2))
763            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
764                // Don't compare the names of the technical inner field
765                // Usually "item" but that's not mandated
766                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
767            }
768            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
769                // Don't compare the names of the technical inner fields
770                // Usually "entries", "key", "value" but that's not mandated
771                match (f1.data_type(), f2.data_type()) {
772                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
773                        f1_inner.len() == f2_inner.len()
774                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
775                                Self::datatype_is_semantically_equal(
776                                    f1.data_type(),
777                                    f2.data_type(),
778                                )
779                            })
780                    }
781                    _ => panic!("Map type should have an inner struct field"),
782                }
783            }
784            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
785                let iter1 = fields1.iter();
786                let iter2 = fields2.iter();
787                fields1.len() == fields2.len() &&
788                        // all fields have to be the same
789                    iter1
790                    .zip(iter2)
791                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
792            }
793            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
794                let iter1 = fields1.iter();
795                let iter2 = fields2.iter();
796                fields1.len() == fields2.len() &&
797                    // all fields have to be the same
798                    iter1
799                        .zip(iter2)
800                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
801            }
802            (
803                DataType::Decimal128(_l_precision, _l_scale),
804                DataType::Decimal128(_r_precision, _r_scale),
805            ) => true,
806            (
807                DataType::Decimal256(_l_precision, _l_scale),
808                DataType::Decimal256(_r_precision, _r_scale),
809            ) => true,
810            (
811                DataType::Timestamp(_l_time_unit, _l_timezone),
812                DataType::Timestamp(_r_time_unit, _r_timezone),
813            ) => true,
814            _ => dt1 == dt2,
815        }
816    }
817
818    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
819        f1.name() == f2.name()
820            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
821    }
822
823    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
824        f1.name() == f2.name()
825            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
826    }
827
828    /// Strip all field qualifier in schema
829    pub fn strip_qualifiers(self) -> Self {
830        DFSchema {
831            field_qualifiers: vec![None; self.inner.fields.len()],
832            inner: self.inner,
833            functional_dependencies: self.functional_dependencies,
834        }
835    }
836
837    /// Replace all field qualifier with new value in schema
838    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
839        let qualifier = qualifier.into();
840        DFSchema {
841            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
842            inner: self.inner,
843            functional_dependencies: self.functional_dependencies,
844        }
845    }
846
847    /// Get list of fully-qualified field names in this schema
848    pub fn field_names(&self) -> Vec<String> {
849        self.iter()
850            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
851            .collect::<Vec<_>>()
852    }
853
854    /// Get metadata of this schema
855    pub fn metadata(&self) -> &HashMap<String, String> {
856        &self.inner.metadata
857    }
858
859    /// Get functional dependencies
860    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
861        &self.functional_dependencies
862    }
863
864    /// Iterate over the qualifiers and fields in the DFSchema
865    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
866        self.field_qualifiers
867            .iter()
868            .zip(self.inner.fields().iter())
869            .map(|(qualifier, field)| (qualifier.as_ref(), field))
870    }
871}
872
873impl From<DFSchema> for Schema {
874    /// Convert DFSchema into a Schema
875    fn from(df_schema: DFSchema) -> Self {
876        let fields: Fields = df_schema.inner.fields.clone();
877        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
878    }
879}
880
881impl From<&DFSchema> for Schema {
882    /// Convert DFSchema reference into a Schema
883    fn from(df_schema: &DFSchema) -> Self {
884        let fields: Fields = df_schema.inner.fields.clone();
885        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
886    }
887}
888
889/// Allow DFSchema to be converted into an Arrow `&Schema`
890impl AsRef<Schema> for DFSchema {
891    fn as_ref(&self) -> &Schema {
892        self.as_arrow()
893    }
894}
895
896/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
897/// example)
898impl AsRef<SchemaRef> for DFSchema {
899    fn as_ref(&self) -> &SchemaRef {
900        self.inner()
901    }
902}
903
904/// Create a `DFSchema` from an Arrow schema
905impl TryFrom<Schema> for DFSchema {
906    type Error = DataFusionError;
907    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
908        Self::try_from(Arc::new(schema))
909    }
910}
911
912impl TryFrom<SchemaRef> for DFSchema {
913    type Error = DataFusionError;
914    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
915        let field_count = schema.fields.len();
916        let dfschema = Self {
917            inner: schema,
918            field_qualifiers: vec![None; field_count],
919            functional_dependencies: FunctionalDependencies::empty(),
920        };
921        // Without checking names, because schema here may have duplicate field names.
922        // For example, Partial AggregateMode will generate duplicate field names from
923        // state_fields.
924        // See <https://github.com/apache/datafusion/issues/17715>
925        // dfschema.check_names()?;
926        Ok(dfschema)
927    }
928}
929
930impl From<DFSchema> for SchemaRef {
931    fn from(df_schema: DFSchema) -> Self {
932        SchemaRef::new(df_schema.into())
933    }
934}
935
936// Hashing refers to a subset of fields considered in PartialEq.
937impl Hash for DFSchema {
938    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
939        self.inner.fields.hash(state);
940        self.inner.metadata.len().hash(state); // HashMap is not hashable
941    }
942}
943
944/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
945pub trait ToDFSchema
946where
947    Self: Sized,
948{
949    /// Attempt to create a DSSchema
950    fn to_dfschema(self) -> Result<DFSchema>;
951
952    /// Attempt to create a DSSchemaRef
953    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
954        Ok(Arc::new(self.to_dfschema()?))
955    }
956}
957
958impl ToDFSchema for Schema {
959    fn to_dfschema(self) -> Result<DFSchema> {
960        DFSchema::try_from(self)
961    }
962}
963
964impl ToDFSchema for SchemaRef {
965    fn to_dfschema(self) -> Result<DFSchema> {
966        DFSchema::try_from(self)
967    }
968}
969
970impl ToDFSchema for Vec<Field> {
971    fn to_dfschema(self) -> Result<DFSchema> {
972        let field_count = self.len();
973        let schema = Schema {
974            fields: self.into(),
975            metadata: HashMap::new(),
976        };
977        let dfschema = DFSchema {
978            inner: schema.into(),
979            field_qualifiers: vec![None; field_count],
980            functional_dependencies: FunctionalDependencies::empty(),
981        };
982        Ok(dfschema)
983    }
984}
985
986impl Display for DFSchema {
987    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
988        write!(
989            f,
990            "fields:[{}], metadata:{:?}",
991            self.iter()
992                .map(|(q, f)| qualified_name(q, f.name()))
993                .collect::<Vec<String>>()
994                .join(", "),
995            self.inner.metadata
996        )
997    }
998}
999
1000/// Provides schema information needed by certain methods of `Expr`
1001/// (defined in the datafusion-common crate).
1002///
1003/// Note that this trait is implemented for &[DFSchema] which is
1004/// widely used in the DataFusion codebase.
1005pub trait ExprSchema: std::fmt::Debug {
1006    /// Is this column reference nullable?
1007    fn nullable(&self, col: &Column) -> Result<bool> {
1008        Ok(self.field_from_column(col)?.is_nullable())
1009    }
1010
1011    /// What is the datatype of this column?
1012    fn data_type(&self, col: &Column) -> Result<&DataType> {
1013        Ok(self.field_from_column(col)?.data_type())
1014    }
1015
1016    /// Returns the column's optional metadata.
1017    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1018        Ok(self.field_from_column(col)?.metadata())
1019    }
1020
1021    /// Return the column's datatype and nullability
1022    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1023        let field = self.field_from_column(col)?;
1024        Ok((field.data_type(), field.is_nullable()))
1025    }
1026
1027    // Return the column's field
1028    fn field_from_column(&self, col: &Column) -> Result<&Field>;
1029}
1030
1031// Implement `ExprSchema` for `Arc<DFSchema>`
1032impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1033    fn nullable(&self, col: &Column) -> Result<bool> {
1034        self.as_ref().nullable(col)
1035    }
1036
1037    fn data_type(&self, col: &Column) -> Result<&DataType> {
1038        self.as_ref().data_type(col)
1039    }
1040
1041    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1042        ExprSchema::metadata(self.as_ref(), col)
1043    }
1044
1045    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1046        self.as_ref().data_type_and_nullable(col)
1047    }
1048
1049    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1050        self.as_ref().field_from_column(col)
1051    }
1052}
1053
1054impl ExprSchema for DFSchema {
1055    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1056        match &col.relation {
1057            Some(r) => self.field_with_qualified_name(r, &col.name),
1058            None => self.field_with_unqualified_name(&col.name),
1059        }
1060    }
1061}
1062
1063/// DataFusion-specific extensions to [`Schema`].
1064pub trait SchemaExt {
1065    /// This is a specialized version of Eq that ignores differences
1066    /// in nullability and metadata.
1067    ///
1068    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1069    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1070
1071    /// Returns nothing if the two schemas have the same qualified named
1072    /// fields with logically equivalent data types. Returns internal error otherwise.
1073    ///
1074    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1075    /// equivalence checking.
1076    ///
1077    /// It is only used by insert into cases.
1078    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1079}
1080
1081impl SchemaExt for Schema {
1082    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1083        if self.fields().len() != other.fields().len() {
1084            return false;
1085        }
1086
1087        self.fields()
1088            .iter()
1089            .zip(other.fields().iter())
1090            .all(|(f1, f2)| {
1091                f1.name() == f2.name()
1092                    && DFSchema::datatype_is_semantically_equal(
1093                        f1.data_type(),
1094                        f2.data_type(),
1095                    )
1096            })
1097    }
1098
1099    // It is only used by insert into cases.
1100    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1101        // case 1 : schema length mismatch
1102        if self.fields().len() != other.fields().len() {
1103            _plan_err!(
1104                "Inserting query must have the same schema length as the table. \
1105            Expected table schema length: {}, got: {}",
1106                self.fields().len(),
1107                other.fields().len()
1108            )
1109        } else {
1110            // case 2 : schema length match, but fields mismatch
1111            // check if the fields name are the same and have the same data types
1112            self.fields()
1113                .iter()
1114                .zip(other.fields().iter())
1115                .try_for_each(|(f1, f2)| {
1116                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1117                        _plan_err!(
1118                            "Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
1119                            but got '{}' with type {:?}.",
1120                            f1.name(),
1121                            f1.data_type(),
1122                            f2.name(),
1123                            f2.data_type())
1124                    } else {
1125                        Ok(())
1126                    }
1127                })
1128        }
1129    }
1130}
1131
1132pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1133    match qualifier {
1134        Some(q) => format!("{q}.{name}"),
1135        None => name.to_string(),
1136    }
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141    use crate::assert_contains;
1142
1143    use super::*;
1144
1145    #[test]
1146    fn qualifier_in_name() -> Result<()> {
1147        let col = Column::from_name("t1.c0");
1148        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1149        // lookup with unqualified name "t1.c0"
1150        let err = schema.index_of_column(&col).unwrap_err();
1151        let expected = "Schema error: No field named \"t1.c0\". \
1152            Column names are case sensitive. \
1153            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1154            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1155            Did you mean 't1.c0'?.";
1156        assert_eq!(err.strip_backtrace(), expected);
1157        Ok(())
1158    }
1159
1160    #[test]
1161    fn quoted_qualifiers_in_name() -> Result<()> {
1162        let col = Column::from_name("t1.c0");
1163        let schema = DFSchema::try_from_qualified_schema(
1164            "t1",
1165            &Schema::new(vec![
1166                Field::new("CapitalColumn", DataType::Boolean, true),
1167                Field::new("field.with.period", DataType::Boolean, true),
1168            ]),
1169        )?;
1170
1171        // lookup with unqualified name "t1.c0"
1172        let err = schema.index_of_column(&col).unwrap_err();
1173        let expected = "Schema error: No field named \"t1.c0\". \
1174            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1175        assert_eq!(err.strip_backtrace(), expected);
1176        Ok(())
1177    }
1178
1179    #[test]
1180    fn from_unqualified_schema() -> Result<()> {
1181        let schema = DFSchema::try_from(test_schema_1())?;
1182        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1183        Ok(())
1184    }
1185
1186    #[test]
1187    fn from_qualified_schema() -> Result<()> {
1188        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1189        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1190        Ok(())
1191    }
1192
1193    #[test]
1194    fn test_from_field_specific_qualified_schema() -> Result<()> {
1195        let schema = DFSchema::from_field_specific_qualified_schema(
1196            vec![Some("t1".into()), None],
1197            &Arc::new(Schema::new(vec![
1198                Field::new("c0", DataType::Boolean, true),
1199                Field::new("c1", DataType::Boolean, true),
1200            ])),
1201        )?;
1202        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1203        Ok(())
1204    }
1205
1206    #[test]
1207    fn test_from_qualified_fields() -> Result<()> {
1208        let schema = DFSchema::new_with_metadata(
1209            vec![
1210                (
1211                    Some("t0".into()),
1212                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1213                ),
1214                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1215            ],
1216            HashMap::new(),
1217        )?;
1218        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1219        Ok(())
1220    }
1221
1222    #[test]
1223    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1224        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1225        let arrow_schema: Schema = schema.into();
1226        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
1227        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
1228        assert_eq!(expected, arrow_schema.to_string());
1229        Ok(())
1230    }
1231
1232    #[test]
1233    fn join_qualified() -> Result<()> {
1234        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1235        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1236        let join = left.join(&right)?;
1237        assert_eq!(
1238            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1239            join.to_string()
1240        );
1241        // test valid access
1242        assert!(join
1243            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1244            .is_ok());
1245        assert!(join
1246            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1247            .is_ok());
1248        // test invalid access
1249        assert!(join.field_with_unqualified_name("c0").is_err());
1250        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1251        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1252        Ok(())
1253    }
1254
1255    #[test]
1256    fn join_qualified_duplicate() -> Result<()> {
1257        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1258        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1259        let join = left.join(&right);
1260        assert_eq!(
1261            join.unwrap_err().strip_backtrace(),
1262            "Schema error: Schema contains duplicate qualified field name t1.c0",
1263        );
1264        Ok(())
1265    }
1266
1267    #[test]
1268    fn join_unqualified_duplicate() -> Result<()> {
1269        let left = DFSchema::try_from(test_schema_1())?;
1270        let right = DFSchema::try_from(test_schema_1())?;
1271        let join = left.join(&right);
1272        assert_eq!(
1273            join.unwrap_err().strip_backtrace(),
1274            "Schema error: Schema contains duplicate unqualified field name c0"
1275        );
1276        Ok(())
1277    }
1278
1279    #[test]
1280    fn join_mixed() -> Result<()> {
1281        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1282        let right = DFSchema::try_from(test_schema_2())?;
1283        let join = left.join(&right)?;
1284        assert_eq!(
1285            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1286            join.to_string()
1287        );
1288        // test valid access
1289        assert!(join
1290            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1291            .is_ok());
1292        assert!(join.field_with_unqualified_name("c0").is_ok());
1293        assert!(join.field_with_unqualified_name("c100").is_ok());
1294        assert!(join.field_with_name(None, "c100").is_ok());
1295        // test invalid access
1296        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1297        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1298        assert!(join
1299            .field_with_qualified_name(&TableReference::bare(""), "c100")
1300            .is_err());
1301        Ok(())
1302    }
1303
1304    #[test]
1305    fn join_mixed_duplicate() -> Result<()> {
1306        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1307        let right = DFSchema::try_from(test_schema_1())?;
1308        let join = left.join(&right);
1309        assert_contains!(join.unwrap_err().to_string(),
1310                         "Schema error: Schema contains qualified \
1311                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1312        Ok(())
1313    }
1314
1315    #[test]
1316    fn helpful_error_messages() -> Result<()> {
1317        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1318        let expected_help = "Valid fields are t1.c0, t1.c1.";
1319        assert_contains!(
1320            schema
1321                .field_with_qualified_name(&TableReference::bare("x"), "y")
1322                .unwrap_err()
1323                .to_string(),
1324            expected_help
1325        );
1326        assert_contains!(
1327            schema
1328                .field_with_unqualified_name("y")
1329                .unwrap_err()
1330                .to_string(),
1331            expected_help
1332        );
1333        assert!(schema.index_of_column_by_name(None, "y").is_none());
1334        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1335
1336        Ok(())
1337    }
1338
1339    #[test]
1340    fn select_without_valid_fields() {
1341        let schema = DFSchema::empty();
1342
1343        let col = Column::from_qualified_name("t1.c0");
1344        let err = schema.index_of_column(&col).unwrap_err();
1345        let expected = "Schema error: No field named t1.c0.";
1346        assert_eq!(err.strip_backtrace(), expected);
1347
1348        // the same check without qualifier
1349        let col = Column::from_name("c0");
1350        let err = schema.index_of_column(&col).err().unwrap();
1351        let expected = "Schema error: No field named c0.";
1352        assert_eq!(err.strip_backtrace(), expected);
1353    }
1354
1355    #[test]
1356    fn into() {
1357        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1358        let arrow_schema = Schema::new_with_metadata(
1359            vec![Field::new("c0", DataType::Int64, true)],
1360            test_metadata(),
1361        );
1362        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1363
1364        let df_schema = DFSchema {
1365            inner: Arc::clone(&arrow_schema_ref),
1366            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1367            functional_dependencies: FunctionalDependencies::empty(),
1368        };
1369        let df_schema_ref = Arc::new(df_schema.clone());
1370
1371        {
1372            let arrow_schema = arrow_schema.clone();
1373            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1374
1375            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1376            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1377        }
1378
1379        {
1380            let arrow_schema = arrow_schema.clone();
1381            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1382
1383            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1384            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1385        }
1386
1387        // Now, consume the refs
1388        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1389        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1390    }
1391
1392    fn test_schema_1() -> Schema {
1393        Schema::new(vec![
1394            Field::new("c0", DataType::Boolean, true),
1395            Field::new("c1", DataType::Boolean, true),
1396        ])
1397    }
1398    #[test]
1399    fn test_dfschema_to_schema_conversion() {
1400        let mut a_metadata = HashMap::new();
1401        a_metadata.insert("key".to_string(), "value".to_string());
1402        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1403
1404        let mut b_metadata = HashMap::new();
1405        b_metadata.insert("key".to_string(), "value".to_string());
1406        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1407
1408        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1409
1410        let df_schema = DFSchema {
1411            inner: Arc::clone(&schema),
1412            field_qualifiers: vec![None; schema.fields.len()],
1413            functional_dependencies: FunctionalDependencies::empty(),
1414        };
1415
1416        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1417    }
1418
1419    #[test]
1420    fn test_contain_column() -> Result<()> {
1421        // qualified exists
1422        {
1423            let col = Column::from_qualified_name("t1.c0");
1424            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1425            assert!(schema.is_column_from_schema(&col));
1426        }
1427
1428        // qualified not exists
1429        {
1430            let col = Column::from_qualified_name("t1.c2");
1431            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1432            assert!(!schema.is_column_from_schema(&col));
1433        }
1434
1435        // unqualified exists
1436        {
1437            let col = Column::from_name("c0");
1438            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1439            assert!(schema.is_column_from_schema(&col));
1440        }
1441
1442        // unqualified not exists
1443        {
1444            let col = Column::from_name("c2");
1445            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1446            assert!(!schema.is_column_from_schema(&col));
1447        }
1448
1449        Ok(())
1450    }
1451
1452    #[test]
1453    fn test_datatype_is_logically_equal() {
1454        assert!(DFSchema::datatype_is_logically_equal(
1455            &DataType::Int8,
1456            &DataType::Int8
1457        ));
1458
1459        assert!(!DFSchema::datatype_is_logically_equal(
1460            &DataType::Int8,
1461            &DataType::Int16
1462        ));
1463
1464        // Test lists
1465
1466        // Succeeds if both have the same element type, disregards names and nullability
1467        assert!(DFSchema::datatype_is_logically_equal(
1468            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1469            &DataType::List(Field::new("element", DataType::Int8, false).into())
1470        ));
1471
1472        // Fails if element type is different
1473        assert!(!DFSchema::datatype_is_logically_equal(
1474            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1475            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1476        ));
1477
1478        // Test maps
1479        let map_field = DataType::Map(
1480            Field::new(
1481                "entries",
1482                DataType::Struct(Fields::from(vec![
1483                    Field::new("key", DataType::Int8, false),
1484                    Field::new("value", DataType::Int8, true),
1485                ])),
1486                true,
1487            )
1488            .into(),
1489            true,
1490        );
1491
1492        // Succeeds if both maps have the same key and value types, disregards names and nullability
1493        assert!(DFSchema::datatype_is_logically_equal(
1494            &map_field,
1495            &DataType::Map(
1496                Field::new(
1497                    "pairs",
1498                    DataType::Struct(Fields::from(vec![
1499                        Field::new("one", DataType::Int8, false),
1500                        Field::new("two", DataType::Int8, false)
1501                    ])),
1502                    true
1503                )
1504                .into(),
1505                true
1506            )
1507        ));
1508        // Fails if value type is different
1509        assert!(!DFSchema::datatype_is_logically_equal(
1510            &map_field,
1511            &DataType::Map(
1512                Field::new(
1513                    "entries",
1514                    DataType::Struct(Fields::from(vec![
1515                        Field::new("key", DataType::Int8, false),
1516                        Field::new("value", DataType::Int16, true)
1517                    ])),
1518                    true
1519                )
1520                .into(),
1521                true
1522            )
1523        ));
1524
1525        // Fails if key type is different
1526        assert!(!DFSchema::datatype_is_logically_equal(
1527            &map_field,
1528            &DataType::Map(
1529                Field::new(
1530                    "entries",
1531                    DataType::Struct(Fields::from(vec![
1532                        Field::new("key", DataType::Int16, false),
1533                        Field::new("value", DataType::Int8, true)
1534                    ])),
1535                    true
1536                )
1537                .into(),
1538                true
1539            )
1540        ));
1541
1542        // Test structs
1543
1544        let struct_field = DataType::Struct(Fields::from(vec![
1545            Field::new("a", DataType::Int8, true),
1546            Field::new("b", DataType::Int8, true),
1547        ]));
1548
1549        // Succeeds if both have same names and datatypes, ignores nullability
1550        assert!(DFSchema::datatype_is_logically_equal(
1551            &struct_field,
1552            &DataType::Struct(Fields::from(vec![
1553                Field::new("a", DataType::Int8, false),
1554                Field::new("b", DataType::Int8, true),
1555            ]))
1556        ));
1557
1558        // Fails if field names are different
1559        assert!(!DFSchema::datatype_is_logically_equal(
1560            &struct_field,
1561            &DataType::Struct(Fields::from(vec![
1562                Field::new("x", DataType::Int8, true),
1563                Field::new("y", DataType::Int8, true),
1564            ]))
1565        ));
1566
1567        // Fails if types are different
1568        assert!(!DFSchema::datatype_is_logically_equal(
1569            &struct_field,
1570            &DataType::Struct(Fields::from(vec![
1571                Field::new("a", DataType::Int16, true),
1572                Field::new("b", DataType::Int8, true),
1573            ]))
1574        ));
1575
1576        // Fails if more or less fields
1577        assert!(!DFSchema::datatype_is_logically_equal(
1578            &struct_field,
1579            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1580        ));
1581    }
1582
1583    #[test]
1584    fn test_datatype_is_logically_equivalent_to_dictionary() {
1585        // Dictionary is logically equal to its value type
1586        assert!(DFSchema::datatype_is_logically_equal(
1587            &DataType::Utf8,
1588            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1589        ));
1590    }
1591
1592    #[test]
1593    fn test_datatype_is_semantically_equal() {
1594        assert!(DFSchema::datatype_is_semantically_equal(
1595            &DataType::Int8,
1596            &DataType::Int8
1597        ));
1598
1599        assert!(!DFSchema::datatype_is_semantically_equal(
1600            &DataType::Int8,
1601            &DataType::Int16
1602        ));
1603
1604        // Any two timestamp types should match
1605        assert!(DFSchema::datatype_is_semantically_equal(
1606            &DataType::Timestamp(
1607                arrow::datatypes::TimeUnit::Microsecond,
1608                Some("UTC".into())
1609            ),
1610            &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1611        ));
1612        // Test lists
1613
1614        // Succeeds if both have the same element type, disregards names and nullability
1615        assert!(DFSchema::datatype_is_semantically_equal(
1616            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1617            &DataType::List(Field::new("element", DataType::Int8, false).into())
1618        ));
1619
1620        // Fails if element type is different
1621        assert!(!DFSchema::datatype_is_semantically_equal(
1622            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1623            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1624        ));
1625
1626        // Test maps
1627        let map_field = DataType::Map(
1628            Field::new(
1629                "entries",
1630                DataType::Struct(Fields::from(vec![
1631                    Field::new("key", DataType::Int8, false),
1632                    Field::new("value", DataType::Int8, true),
1633                ])),
1634                true,
1635            )
1636            .into(),
1637            true,
1638        );
1639
1640        // Succeeds if both maps have the same key and value types, disregards names and nullability
1641        assert!(DFSchema::datatype_is_semantically_equal(
1642            &map_field,
1643            &DataType::Map(
1644                Field::new(
1645                    "pairs",
1646                    DataType::Struct(Fields::from(vec![
1647                        Field::new("one", DataType::Int8, false),
1648                        Field::new("two", DataType::Int8, false)
1649                    ])),
1650                    true
1651                )
1652                .into(),
1653                true
1654            )
1655        ));
1656        // Fails if value type is different
1657        assert!(!DFSchema::datatype_is_semantically_equal(
1658            &map_field,
1659            &DataType::Map(
1660                Field::new(
1661                    "entries",
1662                    DataType::Struct(Fields::from(vec![
1663                        Field::new("key", DataType::Int8, false),
1664                        Field::new("value", DataType::Int16, true)
1665                    ])),
1666                    true
1667                )
1668                .into(),
1669                true
1670            )
1671        ));
1672
1673        // Fails if key type is different
1674        assert!(!DFSchema::datatype_is_semantically_equal(
1675            &map_field,
1676            &DataType::Map(
1677                Field::new(
1678                    "entries",
1679                    DataType::Struct(Fields::from(vec![
1680                        Field::new("key", DataType::Int16, false),
1681                        Field::new("value", DataType::Int8, true)
1682                    ])),
1683                    true
1684                )
1685                .into(),
1686                true
1687            )
1688        ));
1689
1690        // Test structs
1691
1692        let struct_field = DataType::Struct(Fields::from(vec![
1693            Field::new("a", DataType::Int8, true),
1694            Field::new("b", DataType::Int8, true),
1695        ]));
1696
1697        // Succeeds if both have same names and datatypes, ignores nullability
1698        assert!(DFSchema::datatype_is_logically_equal(
1699            &struct_field,
1700            &DataType::Struct(Fields::from(vec![
1701                Field::new("a", DataType::Int8, false),
1702                Field::new("b", DataType::Int8, true),
1703            ]))
1704        ));
1705
1706        // Fails if field names are different
1707        assert!(!DFSchema::datatype_is_logically_equal(
1708            &struct_field,
1709            &DataType::Struct(Fields::from(vec![
1710                Field::new("x", DataType::Int8, true),
1711                Field::new("y", DataType::Int8, true),
1712            ]))
1713        ));
1714
1715        // Fails if types are different
1716        assert!(!DFSchema::datatype_is_logically_equal(
1717            &struct_field,
1718            &DataType::Struct(Fields::from(vec![
1719                Field::new("a", DataType::Int16, true),
1720                Field::new("b", DataType::Int8, true),
1721            ]))
1722        ));
1723
1724        // Fails if more or less fields
1725        assert!(!DFSchema::datatype_is_logically_equal(
1726            &struct_field,
1727            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1728        ));
1729    }
1730
1731    #[test]
1732    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1733        // Dictionary is not semantically equal to its value type
1734        assert!(!DFSchema::datatype_is_semantically_equal(
1735            &DataType::Utf8,
1736            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1737        ));
1738    }
1739
1740    fn test_schema_2() -> Schema {
1741        Schema::new(vec![
1742            Field::new("c100", DataType::Boolean, true),
1743            Field::new("c101", DataType::Boolean, true),
1744        ])
1745    }
1746
1747    fn test_metadata() -> HashMap<String, String> {
1748        test_metadata_n(2)
1749    }
1750
1751    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1752        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1753    }
1754}