datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use datafusion_common::{DFSchema, Column};
60/// use arrow::datatypes::{DataType, Field, Schema};
61///
62/// let arrow_schema = Schema::new(vec![
63///    Field::new("c1", DataType::Int32, false),
64/// ]);
65///
66/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
67/// let column = Column::from_qualified_name("t1.c1");
68/// assert!(df_schema.has_column(&column));
69///
70/// // Can also access qualified fields with unqualified name, if it's unambiguous
71/// let column = Column::from_qualified_name("c1");
72/// assert!(df_schema.has_column(&column));
73/// ```
74///
75/// # Creating unqualified schemas
76///
77/// Create an unqualified schema using TryFrom:
78///
79/// ```rust
80/// use datafusion_common::{DFSchema, Column};
81/// use arrow::datatypes::{DataType, Field, Schema};
82///
83/// let arrow_schema = Schema::new(vec![
84///    Field::new("c1", DataType::Int32, false),
85/// ]);
86///
87/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
88/// let column = Column::new_unqualified("c1");
89/// assert!(df_schema.has_column(&column));
90/// ```
91///
92/// # Converting back to Arrow schema
93///
94/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
95///
96/// ```rust
97/// use datafusion_common::DFSchema;
98/// use arrow::datatypes::{Schema, Field};
99/// use std::collections::HashMap;
100///
101/// let df_schema = DFSchema::from_unqualified_fields(vec![
102///    Field::new("c1", arrow::datatypes::DataType::Int32, false),
103/// ].into(),HashMap::new()).unwrap();
104/// let schema = Schema::from(df_schema);
105/// assert_eq!(schema.fields().len(), 1);
106/// ```
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DFSchema {
109    /// Inner Arrow schema reference.
110    inner: SchemaRef,
111    /// Optional qualifiers for each column in this schema. In the same order as
112    /// the `self.inner.fields()`
113    field_qualifiers: Vec<Option<TableReference>>,
114    /// Stores functional dependencies in the schema.
115    functional_dependencies: FunctionalDependencies,
116}
117
118impl DFSchema {
119    /// Creates an empty `DFSchema`
120    pub fn empty() -> Self {
121        Self {
122            inner: Arc::new(Schema::new([])),
123            field_qualifiers: vec![],
124            functional_dependencies: FunctionalDependencies::empty(),
125        }
126    }
127
128    /// Return a reference to the inner Arrow [`Schema`]
129    ///
130    /// Note this does not have the qualifier information
131    pub fn as_arrow(&self) -> &Schema {
132        self.inner.as_ref()
133    }
134
135    /// Return a reference to the inner Arrow [`SchemaRef`]
136    ///
137    /// Note this does not have the qualifier information
138    pub fn inner(&self) -> &SchemaRef {
139        &self.inner
140    }
141
142    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
143    pub fn new_with_metadata(
144        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145        metadata: HashMap<String, String>,
146    ) -> Result<Self> {
147        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
148            qualified_fields.into_iter().unzip();
149
150        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
151
152        let dfschema = Self {
153            inner: schema,
154            field_qualifiers: qualifiers,
155            functional_dependencies: FunctionalDependencies::empty(),
156        };
157        dfschema.check_names()?;
158        Ok(dfschema)
159    }
160
161    /// Create a new `DFSchema` from a list of Arrow [Field]s
162    pub fn from_unqualified_fields(
163        fields: Fields,
164        metadata: HashMap<String, String>,
165    ) -> Result<Self> {
166        let field_count = fields.len();
167        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
168        let dfschema = Self {
169            inner: schema,
170            field_qualifiers: vec![None; field_count],
171            functional_dependencies: FunctionalDependencies::empty(),
172        };
173        dfschema.check_names()?;
174        Ok(dfschema)
175    }
176
177    /// Create a `DFSchema` from an Arrow schema and a given qualifier
178    ///
179    /// To create a schema from an Arrow schema without a qualifier, use
180    /// `DFSchema::try_from`.
181    pub fn try_from_qualified_schema(
182        qualifier: impl Into<TableReference>,
183        schema: &Schema,
184    ) -> Result<Self> {
185        let qualifier = qualifier.into();
186        let schema = DFSchema {
187            inner: schema.clone().into(),
188            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
189            functional_dependencies: FunctionalDependencies::empty(),
190        };
191        schema.check_names()?;
192        Ok(schema)
193    }
194
195    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
196    pub fn from_field_specific_qualified_schema(
197        qualifiers: Vec<Option<TableReference>>,
198        schema: &SchemaRef,
199    ) -> Result<Self> {
200        let dfschema = Self {
201            inner: Arc::clone(schema),
202            field_qualifiers: qualifiers,
203            functional_dependencies: FunctionalDependencies::empty(),
204        };
205        dfschema.check_names()?;
206        Ok(dfschema)
207    }
208
209    /// Return the same schema, where all fields have a given qualifier.
210    pub fn with_field_specific_qualified_schema(
211        &self,
212        qualifiers: Vec<Option<TableReference>>,
213    ) -> Result<Self> {
214        if qualifiers.len() != self.fields().len() {
215            return _plan_err!(
216                "Number of qualifiers must match number of fields. Expected {}, got {}",
217                self.fields().len(),
218                qualifiers.len()
219            );
220        }
221        Ok(DFSchema {
222            inner: Arc::clone(&self.inner),
223            field_qualifiers: qualifiers,
224            functional_dependencies: self.functional_dependencies.clone(),
225        })
226    }
227
228    /// Check if the schema have some fields with the same name
229    pub fn check_names(&self) -> Result<()> {
230        let mut qualified_names = BTreeSet::new();
231        let mut unqualified_names = BTreeSet::new();
232
233        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
234            if let Some(qualifier) = qualifier {
235                if !qualified_names.insert((qualifier, field.name())) {
236                    return _schema_err!(SchemaError::DuplicateQualifiedField {
237                        qualifier: Box::new(qualifier.clone()),
238                        name: field.name().to_string(),
239                    });
240                }
241            } else if !unqualified_names.insert(field.name()) {
242                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
243                    name: field.name().to_string()
244                });
245            }
246        }
247
248        for (qualifier, name) in qualified_names {
249            if unqualified_names.contains(name) {
250                return _schema_err!(SchemaError::AmbiguousReference {
251                    field: Box::new(Column::new(Some(qualifier.clone()), name))
252                });
253            }
254        }
255        Ok(())
256    }
257
258    /// Assigns functional dependencies.
259    pub fn with_functional_dependencies(
260        mut self,
261        functional_dependencies: FunctionalDependencies,
262    ) -> Result<Self> {
263        if functional_dependencies.is_valid(self.inner.fields.len()) {
264            self.functional_dependencies = functional_dependencies;
265            Ok(self)
266        } else {
267            _plan_err!(
268                "Invalid functional dependency: {:?}",
269                functional_dependencies
270            )
271        }
272    }
273
274    /// Create a new schema that contains the fields from this schema followed by the fields
275    /// from the supplied schema. An error will be returned if there are duplicate field names.
276    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
277        let mut schema_builder = SchemaBuilder::new();
278        schema_builder.extend(self.inner.fields().iter().cloned());
279        schema_builder.extend(schema.fields().iter().cloned());
280        let new_schema = schema_builder.finish();
281
282        let mut new_metadata = self.inner.metadata.clone();
283        new_metadata.extend(schema.inner.metadata.clone());
284        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
285
286        let mut new_qualifiers = self.field_qualifiers.clone();
287        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
288
289        let new_self = Self {
290            inner: Arc::new(new_schema_with_metadata),
291            field_qualifiers: new_qualifiers,
292            functional_dependencies: FunctionalDependencies::empty(),
293        };
294        new_self.check_names()?;
295        Ok(new_self)
296    }
297
298    /// Modify this schema by appending the fields from the supplied schema, ignoring any
299    /// duplicate fields.
300    ///
301    /// ## Merge Precedence
302    ///
303    /// **Schema-level metadata**: Metadata from both schemas is merged.
304    /// If both schemas have the same metadata key, the value from the `other_schema` parameter takes precedence.
305    ///
306    /// **Field-level merging**: Only non-duplicate fields are added. This means that the
307    /// `self` fields will always take precedence over the `other_schema` fields.
308    /// Duplicate field detection is based on:
309    /// - For qualified fields: both qualifier and field name must match
310    /// - For unqualified fields: only field name needs to match
311    ///
312    /// Take note how the precedence for fields & metadata merging differs;
313    /// merging prefers fields from `self` but prefers metadata from `other_schema`.
314    pub fn merge(&mut self, other_schema: &DFSchema) {
315        if other_schema.inner.fields.is_empty() {
316            return;
317        }
318
319        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
320            self.iter().collect();
321        let self_unqualified_names: HashSet<&str> = self
322            .inner
323            .fields
324            .iter()
325            .map(|field| field.name().as_str())
326            .collect();
327
328        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
329        let mut qualifiers = Vec::new();
330        for (qualifier, field) in other_schema.iter() {
331            // skip duplicate columns
332            let duplicated_field = match qualifier {
333                Some(q) => self_fields.contains(&(Some(q), field)),
334                // for unqualified columns, check as unqualified name
335                None => self_unqualified_names.contains(field.name().as_str()),
336            };
337            if !duplicated_field {
338                schema_builder.push(Arc::clone(field));
339                qualifiers.push(qualifier.cloned());
340            }
341        }
342        let mut metadata = self.inner.metadata.clone();
343        metadata.extend(other_schema.inner.metadata.clone());
344
345        let finished = schema_builder.finish();
346        let finished_with_metadata = finished.with_metadata(metadata);
347        self.inner = finished_with_metadata.into();
348        self.field_qualifiers.extend(qualifiers);
349    }
350
351    /// Get a list of fields
352    pub fn fields(&self) -> &Fields {
353        &self.inner.fields
354    }
355
356    /// Returns an immutable reference of a specific `Field` instance selected using an
357    /// offset within the internal `fields` vector
358    pub fn field(&self, i: usize) -> &Field {
359        &self.inner.fields[i]
360    }
361
362    /// Returns an immutable reference of a specific `Field` instance selected using an
363    /// offset within the internal `fields` vector and its qualifier
364    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
365        (self.field_qualifiers[i].as_ref(), self.field(i))
366    }
367
368    pub fn index_of_column_by_name(
369        &self,
370        qualifier: Option<&TableReference>,
371        name: &str,
372    ) -> Option<usize> {
373        let mut matches = self
374            .iter()
375            .enumerate()
376            .filter(|(_, (q, f))| match (qualifier, q) {
377                // field to lookup is qualified.
378                // current field is qualified and not shared between relations, compare both
379                // qualifier and name.
380                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
381                // field to lookup is qualified but current field is unqualified.
382                (Some(_), None) => false,
383                // field to lookup is unqualified, no need to compare qualifier
384                (None, Some(_)) | (None, None) => f.name() == name,
385            })
386            .map(|(idx, _)| idx);
387        matches.next()
388    }
389
390    /// Find the index of the column with the given qualifier and name,
391    /// returning `None` if not found
392    ///
393    /// See [Self::index_of_column] for a version that returns an error if the
394    /// column is not found
395    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
396        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
397    }
398
399    /// Find the index of the column with the given qualifier and name,
400    /// returning `Err` if not found
401    ///
402    /// See [Self::maybe_index_of_column] for a version that returns `None` if
403    /// the column is not found
404    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
405        self.maybe_index_of_column(col)
406            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
407    }
408
409    /// Check if the column is in the current schema
410    pub fn is_column_from_schema(&self, col: &Column) -> bool {
411        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
412            .is_some()
413    }
414
415    /// Find the field with the given name
416    pub fn field_with_name(
417        &self,
418        qualifier: Option<&TableReference>,
419        name: &str,
420    ) -> Result<&Field> {
421        if let Some(qualifier) = qualifier {
422            self.field_with_qualified_name(qualifier, name)
423        } else {
424            self.field_with_unqualified_name(name)
425        }
426    }
427
428    /// Find the qualified field with the given name
429    pub fn qualified_field_with_name(
430        &self,
431        qualifier: Option<&TableReference>,
432        name: &str,
433    ) -> Result<(Option<&TableReference>, &Field)> {
434        if let Some(qualifier) = qualifier {
435            let idx = self
436                .index_of_column_by_name(Some(qualifier), name)
437                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
438            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
439        } else {
440            self.qualified_field_with_unqualified_name(name)
441        }
442    }
443
444    /// Find all fields having the given qualifier
445    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
446        self.iter()
447            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
448            .map(|(_, f)| f.as_ref())
449            .collect()
450    }
451
452    /// Find all fields indices having the given qualifier
453    pub fn fields_indices_with_qualified(
454        &self,
455        qualifier: &TableReference,
456    ) -> Vec<usize> {
457        self.iter()
458            .enumerate()
459            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
460            .collect()
461    }
462
463    /// Find all fields that match the given name
464    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
465        self.fields()
466            .iter()
467            .filter(|field| field.name() == name)
468            .map(|f| f.as_ref())
469            .collect()
470    }
471
472    /// Find all fields that match the given name and return them with their qualifier
473    pub fn qualified_fields_with_unqualified_name(
474        &self,
475        name: &str,
476    ) -> Vec<(Option<&TableReference>, &Field)> {
477        self.iter()
478            .filter(|(_, field)| field.name() == name)
479            .map(|(qualifier, field)| (qualifier, field.as_ref()))
480            .collect()
481    }
482
483    /// Find all fields that match the given name and convert to column
484    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
485        self.iter()
486            .filter(|(_, field)| field.name() == name)
487            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
488            .collect()
489    }
490
491    /// Return all `Column`s for the schema
492    pub fn columns(&self) -> Vec<Column> {
493        self.iter()
494            .map(|(qualifier, field)| {
495                Column::new(qualifier.cloned(), field.name().clone())
496            })
497            .collect()
498    }
499
500    /// Find the qualified field with the given unqualified name
501    pub fn qualified_field_with_unqualified_name(
502        &self,
503        name: &str,
504    ) -> Result<(Option<&TableReference>, &Field)> {
505        let matches = self.qualified_fields_with_unqualified_name(name);
506        match matches.len() {
507            0 => Err(unqualified_field_not_found(name, self)),
508            1 => Ok((matches[0].0, matches[0].1)),
509            _ => {
510                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
511                // Because name may generate from Alias/... . It means that it don't own qualifier.
512                // For example:
513                //             Join on id = b.id
514                // Project a.id as id   TableScan b id
515                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
516                // one field without qualifier, we should return it.
517                let fields_without_qualifier = matches
518                    .iter()
519                    .filter(|(q, _)| q.is_none())
520                    .collect::<Vec<_>>();
521                if fields_without_qualifier.len() == 1 {
522                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
523                } else {
524                    _schema_err!(SchemaError::AmbiguousReference {
525                        field: Box::new(Column::new_unqualified(name.to_string()))
526                    })
527                }
528            }
529        }
530    }
531
532    /// Find the field with the given name
533    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
534        self.qualified_field_with_unqualified_name(name)
535            .map(|(_, field)| field)
536    }
537
538    /// Find the field with the given qualified name
539    pub fn field_with_qualified_name(
540        &self,
541        qualifier: &TableReference,
542        name: &str,
543    ) -> Result<&Field> {
544        let idx = self
545            .index_of_column_by_name(Some(qualifier), name)
546            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
547
548        Ok(self.field(idx))
549    }
550
551    /// Find the field with the given qualified column
552    pub fn qualified_field_from_column(
553        &self,
554        column: &Column,
555    ) -> Result<(Option<&TableReference>, &Field)> {
556        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
557    }
558
559    /// Find if the field exists with the given name
560    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
561        self.fields().iter().any(|field| field.name() == name)
562    }
563
564    /// Find if the field exists with the given qualified name
565    pub fn has_column_with_qualified_name(
566        &self,
567        qualifier: &TableReference,
568        name: &str,
569    ) -> bool {
570        self.iter()
571            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
572    }
573
574    /// Find if the field exists with the given qualified column
575    pub fn has_column(&self, column: &Column) -> bool {
576        match &column.relation {
577            Some(r) => self.has_column_with_qualified_name(r, &column.name),
578            None => self.has_column_with_unqualified_name(&column.name),
579        }
580    }
581
582    /// Check to see if unqualified field names matches field names in Arrow schema
583    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
584        self.inner
585            .fields
586            .iter()
587            .zip(arrow_schema.fields().iter())
588            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
589    }
590
591    /// Check to see if fields in 2 Arrow schemas are compatible
592    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
593    pub fn check_arrow_schema_type_compatible(
594        &self,
595        arrow_schema: &Schema,
596    ) -> Result<()> {
597        let self_arrow_schema: Schema = self.into();
598        self_arrow_schema
599            .fields()
600            .iter()
601            .zip(arrow_schema.fields().iter())
602            .try_for_each(|(l_field, r_field)| {
603                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
604                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
605                                r_field.name(),
606                                r_field.data_type(),
607                                l_field.name(),
608                                l_field.data_type())
609                } else {
610                    Ok(())
611                }
612            })
613    }
614
615    /// Returns true if the two schemas have the same qualified named
616    /// fields with logically equivalent data types. Returns false otherwise.
617    ///
618    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
619    /// equivalence checking.
620    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
621        if self.fields().len() != other.fields().len() {
622            return false;
623        }
624        let self_fields = self.iter();
625        let other_fields = other.iter();
626        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
627            q1 == q2
628                && f1.name() == f2.name()
629                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
630        })
631    }
632
633    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
634    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
635        self.has_equivalent_names_and_types(other).is_ok()
636    }
637
638    /// Returns Ok if the two schemas have the same qualified named
639    /// fields with the compatible data types.
640    ///
641    /// Returns an `Err` with a message otherwise.
642    ///
643    /// This is a specialized version of Eq that ignores differences in
644    /// nullability and metadata.
645    ///
646    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
647    /// logical type checking, which for example would consider a dictionary
648    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
649    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
650        // case 1 : schema length mismatch
651        if self.fields().len() != other.fields().len() {
652            _plan_err!(
653                "Schema mismatch: the schema length are not same \
654            Expected schema length: {}, got: {}",
655                self.fields().len(),
656                other.fields().len()
657            )
658        } else {
659            // case 2 : schema length match, but fields mismatch
660            // check if the fields name are the same and have the same data types
661            self.fields()
662                .iter()
663                .zip(other.fields().iter())
664                .try_for_each(|(f1, f2)| {
665                    if f1.name() != f2.name()
666                        || (!DFSchema::datatype_is_semantically_equal(
667                            f1.data_type(),
668                            f2.data_type(),
669                        ))
670                    {
671                        _plan_err!(
672                            "Schema mismatch: Expected field '{}' with type {:?}, \
673                            but got '{}' with type {:?}.",
674                            f1.name(),
675                            f1.data_type(),
676                            f2.name(),
677                            f2.data_type()
678                        )
679                    } else {
680                        Ok(())
681                    }
682                })
683        }
684    }
685
686    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
687    /// than datatype_is_semantically_equal in that different representations of same data can be
688    /// logically but not semantically equivalent. Semantically equivalent types are always also
689    /// logically equivalent. For example:
690    /// - a Dictionary<K,V> type is logically equal to a plain V type
691    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
692    /// - Utf8 and Utf8View are logically equal
693    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
694        // check nested fields
695        match (dt1, dt2) {
696            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
697                v1.as_ref() == v2.as_ref()
698            }
699            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
700            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
701            (DataType::List(f1), DataType::List(f2))
702            | (DataType::LargeList(f1), DataType::LargeList(f2))
703            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
704                // Don't compare the names of the technical inner field
705                // Usually "item" but that's not mandated
706                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
707            }
708            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
709                // Don't compare the names of the technical inner fields
710                // Usually "entries", "key", "value" but that's not mandated
711                match (f1.data_type(), f2.data_type()) {
712                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
713                        f1_inner.len() == f2_inner.len()
714                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
715                                Self::datatype_is_logically_equal(
716                                    f1.data_type(),
717                                    f2.data_type(),
718                                )
719                            })
720                    }
721                    _ => panic!("Map type should have an inner struct field"),
722                }
723            }
724            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
725                let iter1 = fields1.iter();
726                let iter2 = fields2.iter();
727                fields1.len() == fields2.len() &&
728                        // all fields have to be the same
729                    iter1
730                    .zip(iter2)
731                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
732            }
733            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
734                let iter1 = fields1.iter();
735                let iter2 = fields2.iter();
736                fields1.len() == fields2.len() &&
737                    // all fields have to be the same
738                    iter1
739                        .zip(iter2)
740                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
741            }
742            // Utf8 and Utf8View are logically equivalent
743            (DataType::Utf8, DataType::Utf8View) => true,
744            (DataType::Utf8View, DataType::Utf8) => true,
745            _ => Self::datatype_is_semantically_equal(dt1, dt2),
746        }
747    }
748
749    /// Returns true of two [`DataType`]s are semantically equal (same
750    /// name and type), ignoring both metadata and nullability, and decimal precision/scale.
751    ///
752    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
753    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
754        // check nested fields
755        match (dt1, dt2) {
756            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
757                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
758                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
759            }
760            (DataType::List(f1), DataType::List(f2))
761            | (DataType::LargeList(f1), DataType::LargeList(f2))
762            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
763                // Don't compare the names of the technical inner field
764                // Usually "item" but that's not mandated
765                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
766            }
767            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
768                // Don't compare the names of the technical inner fields
769                // Usually "entries", "key", "value" but that's not mandated
770                match (f1.data_type(), f2.data_type()) {
771                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
772                        f1_inner.len() == f2_inner.len()
773                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
774                                Self::datatype_is_semantically_equal(
775                                    f1.data_type(),
776                                    f2.data_type(),
777                                )
778                            })
779                    }
780                    _ => panic!("Map type should have an inner struct field"),
781                }
782            }
783            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
784                let iter1 = fields1.iter();
785                let iter2 = fields2.iter();
786                fields1.len() == fields2.len() &&
787                        // all fields have to be the same
788                    iter1
789                    .zip(iter2)
790                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
791            }
792            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
793                let iter1 = fields1.iter();
794                let iter2 = fields2.iter();
795                fields1.len() == fields2.len() &&
796                    // all fields have to be the same
797                    iter1
798                        .zip(iter2)
799                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
800            }
801            (
802                DataType::Decimal128(_l_precision, _l_scale),
803                DataType::Decimal128(_r_precision, _r_scale),
804            ) => true,
805            (
806                DataType::Decimal256(_l_precision, _l_scale),
807                DataType::Decimal256(_r_precision, _r_scale),
808            ) => true,
809            _ => dt1 == dt2,
810        }
811    }
812
813    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
814        f1.name() == f2.name()
815            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
816    }
817
818    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
819        f1.name() == f2.name()
820            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
821    }
822
823    /// Strip all field qualifier in schema
824    pub fn strip_qualifiers(self) -> Self {
825        DFSchema {
826            field_qualifiers: vec![None; self.inner.fields.len()],
827            inner: self.inner,
828            functional_dependencies: self.functional_dependencies,
829        }
830    }
831
832    /// Replace all field qualifier with new value in schema
833    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
834        let qualifier = qualifier.into();
835        DFSchema {
836            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
837            inner: self.inner,
838            functional_dependencies: self.functional_dependencies,
839        }
840    }
841
842    /// Get list of fully-qualified field names in this schema
843    pub fn field_names(&self) -> Vec<String> {
844        self.iter()
845            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
846            .collect::<Vec<_>>()
847    }
848
849    /// Get metadata of this schema
850    pub fn metadata(&self) -> &HashMap<String, String> {
851        &self.inner.metadata
852    }
853
854    /// Get functional dependencies
855    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
856        &self.functional_dependencies
857    }
858
859    /// Iterate over the qualifiers and fields in the DFSchema
860    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
861        self.field_qualifiers
862            .iter()
863            .zip(self.inner.fields().iter())
864            .map(|(qualifier, field)| (qualifier.as_ref(), field))
865    }
866}
867
868impl From<DFSchema> for Schema {
869    /// Convert DFSchema into a Schema
870    fn from(df_schema: DFSchema) -> Self {
871        let fields: Fields = df_schema.inner.fields.clone();
872        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
873    }
874}
875
876impl From<&DFSchema> for Schema {
877    /// Convert DFSchema reference into a Schema
878    fn from(df_schema: &DFSchema) -> Self {
879        let fields: Fields = df_schema.inner.fields.clone();
880        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
881    }
882}
883
884/// Allow DFSchema to be converted into an Arrow `&Schema`
885impl AsRef<Schema> for DFSchema {
886    fn as_ref(&self) -> &Schema {
887        self.as_arrow()
888    }
889}
890
891/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
892/// example)
893impl AsRef<SchemaRef> for DFSchema {
894    fn as_ref(&self) -> &SchemaRef {
895        self.inner()
896    }
897}
898
899/// Create a `DFSchema` from an Arrow schema
900impl TryFrom<Schema> for DFSchema {
901    type Error = DataFusionError;
902    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
903        Self::try_from(Arc::new(schema))
904    }
905}
906
907impl TryFrom<SchemaRef> for DFSchema {
908    type Error = DataFusionError;
909    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
910        let field_count = schema.fields.len();
911        let dfschema = Self {
912            inner: schema,
913            field_qualifiers: vec![None; field_count],
914            functional_dependencies: FunctionalDependencies::empty(),
915        };
916        // Without checking names, because schema here may have duplicate field names.
917        // For example, Partial AggregateMode will generate duplicate field names from
918        // state_fields.
919        // See <https://github.com/apache/datafusion/issues/17715>
920        // dfschema.check_names()?;
921        Ok(dfschema)
922    }
923}
924
925impl From<DFSchema> for SchemaRef {
926    fn from(df_schema: DFSchema) -> Self {
927        SchemaRef::new(df_schema.into())
928    }
929}
930
931// Hashing refers to a subset of fields considered in PartialEq.
932impl Hash for DFSchema {
933    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
934        self.inner.fields.hash(state);
935        self.inner.metadata.len().hash(state); // HashMap is not hashable
936    }
937}
938
939/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
940pub trait ToDFSchema
941where
942    Self: Sized,
943{
944    /// Attempt to create a DSSchema
945    fn to_dfschema(self) -> Result<DFSchema>;
946
947    /// Attempt to create a DSSchemaRef
948    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
949        Ok(Arc::new(self.to_dfschema()?))
950    }
951}
952
953impl ToDFSchema for Schema {
954    fn to_dfschema(self) -> Result<DFSchema> {
955        DFSchema::try_from(self)
956    }
957}
958
959impl ToDFSchema for SchemaRef {
960    fn to_dfschema(self) -> Result<DFSchema> {
961        DFSchema::try_from(self)
962    }
963}
964
965impl ToDFSchema for Vec<Field> {
966    fn to_dfschema(self) -> Result<DFSchema> {
967        let field_count = self.len();
968        let schema = Schema {
969            fields: self.into(),
970            metadata: HashMap::new(),
971        };
972        let dfschema = DFSchema {
973            inner: schema.into(),
974            field_qualifiers: vec![None; field_count],
975            functional_dependencies: FunctionalDependencies::empty(),
976        };
977        Ok(dfschema)
978    }
979}
980
981impl Display for DFSchema {
982    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
983        write!(
984            f,
985            "fields:[{}], metadata:{:?}",
986            self.iter()
987                .map(|(q, f)| qualified_name(q, f.name()))
988                .collect::<Vec<String>>()
989                .join(", "),
990            self.inner.metadata
991        )
992    }
993}
994
995/// Provides schema information needed by certain methods of `Expr`
996/// (defined in the datafusion-common crate).
997///
998/// Note that this trait is implemented for &[DFSchema] which is
999/// widely used in the DataFusion codebase.
1000pub trait ExprSchema: std::fmt::Debug {
1001    /// Is this column reference nullable?
1002    fn nullable(&self, col: &Column) -> Result<bool> {
1003        Ok(self.field_from_column(col)?.is_nullable())
1004    }
1005
1006    /// What is the datatype of this column?
1007    fn data_type(&self, col: &Column) -> Result<&DataType> {
1008        Ok(self.field_from_column(col)?.data_type())
1009    }
1010
1011    /// Returns the column's optional metadata.
1012    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1013        Ok(self.field_from_column(col)?.metadata())
1014    }
1015
1016    /// Return the column's datatype and nullability
1017    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1018        let field = self.field_from_column(col)?;
1019        Ok((field.data_type(), field.is_nullable()))
1020    }
1021
1022    // Return the column's field
1023    fn field_from_column(&self, col: &Column) -> Result<&Field>;
1024}
1025
1026// Implement `ExprSchema` for `Arc<DFSchema>`
1027impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1028    fn nullable(&self, col: &Column) -> Result<bool> {
1029        self.as_ref().nullable(col)
1030    }
1031
1032    fn data_type(&self, col: &Column) -> Result<&DataType> {
1033        self.as_ref().data_type(col)
1034    }
1035
1036    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1037        ExprSchema::metadata(self.as_ref(), col)
1038    }
1039
1040    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1041        self.as_ref().data_type_and_nullable(col)
1042    }
1043
1044    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1045        self.as_ref().field_from_column(col)
1046    }
1047}
1048
1049impl ExprSchema for DFSchema {
1050    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1051        match &col.relation {
1052            Some(r) => self.field_with_qualified_name(r, &col.name),
1053            None => self.field_with_unqualified_name(&col.name),
1054        }
1055    }
1056}
1057
1058/// DataFusion-specific extensions to [`Schema`].
1059pub trait SchemaExt {
1060    /// This is a specialized version of Eq that ignores differences
1061    /// in nullability and metadata.
1062    ///
1063    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1064    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1065
1066    /// Returns nothing if the two schemas have the same qualified named
1067    /// fields with logically equivalent data types. Returns internal error otherwise.
1068    ///
1069    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1070    /// equivalence checking.
1071    ///
1072    /// It is only used by insert into cases.
1073    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1074}
1075
1076impl SchemaExt for Schema {
1077    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1078        if self.fields().len() != other.fields().len() {
1079            return false;
1080        }
1081
1082        self.fields()
1083            .iter()
1084            .zip(other.fields().iter())
1085            .all(|(f1, f2)| {
1086                f1.name() == f2.name()
1087                    && DFSchema::datatype_is_semantically_equal(
1088                        f1.data_type(),
1089                        f2.data_type(),
1090                    )
1091            })
1092    }
1093
1094    // It is only used by insert into cases.
1095    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1096        // case 1 : schema length mismatch
1097        if self.fields().len() != other.fields().len() {
1098            _plan_err!(
1099                "Inserting query must have the same schema length as the table. \
1100            Expected table schema length: {}, got: {}",
1101                self.fields().len(),
1102                other.fields().len()
1103            )
1104        } else {
1105            // case 2 : schema length match, but fields mismatch
1106            // check if the fields name are the same and have the same data types
1107            self.fields()
1108                .iter()
1109                .zip(other.fields().iter())
1110                .try_for_each(|(f1, f2)| {
1111                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1112                        _plan_err!(
1113                            "Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
1114                            but got '{}' with type {:?}.",
1115                            f1.name(),
1116                            f1.data_type(),
1117                            f2.name(),
1118                            f2.data_type())
1119                    } else {
1120                        Ok(())
1121                    }
1122                })
1123        }
1124    }
1125}
1126
1127pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1128    match qualifier {
1129        Some(q) => format!("{q}.{name}"),
1130        None => name.to_string(),
1131    }
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136    use crate::assert_contains;
1137
1138    use super::*;
1139
1140    #[test]
1141    fn qualifier_in_name() -> Result<()> {
1142        let col = Column::from_name("t1.c0");
1143        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1144        // lookup with unqualified name "t1.c0"
1145        let err = schema.index_of_column(&col).unwrap_err();
1146        let expected = "Schema error: No field named \"t1.c0\". \
1147            Column names are case sensitive. \
1148            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1149            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1150            Did you mean 't1.c0'?.";
1151        assert_eq!(err.strip_backtrace(), expected);
1152        Ok(())
1153    }
1154
1155    #[test]
1156    fn quoted_qualifiers_in_name() -> Result<()> {
1157        let col = Column::from_name("t1.c0");
1158        let schema = DFSchema::try_from_qualified_schema(
1159            "t1",
1160            &Schema::new(vec![
1161                Field::new("CapitalColumn", DataType::Boolean, true),
1162                Field::new("field.with.period", DataType::Boolean, true),
1163            ]),
1164        )?;
1165
1166        // lookup with unqualified name "t1.c0"
1167        let err = schema.index_of_column(&col).unwrap_err();
1168        let expected = "Schema error: No field named \"t1.c0\". \
1169            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1170        assert_eq!(err.strip_backtrace(), expected);
1171        Ok(())
1172    }
1173
1174    #[test]
1175    fn from_unqualified_schema() -> Result<()> {
1176        let schema = DFSchema::try_from(test_schema_1())?;
1177        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1178        Ok(())
1179    }
1180
1181    #[test]
1182    fn from_qualified_schema() -> Result<()> {
1183        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1184        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1185        Ok(())
1186    }
1187
1188    #[test]
1189    fn test_from_field_specific_qualified_schema() -> Result<()> {
1190        let schema = DFSchema::from_field_specific_qualified_schema(
1191            vec![Some("t1".into()), None],
1192            &Arc::new(Schema::new(vec![
1193                Field::new("c0", DataType::Boolean, true),
1194                Field::new("c1", DataType::Boolean, true),
1195            ])),
1196        )?;
1197        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1198        Ok(())
1199    }
1200
1201    #[test]
1202    fn test_from_qualified_fields() -> Result<()> {
1203        let schema = DFSchema::new_with_metadata(
1204            vec![
1205                (
1206                    Some("t0".into()),
1207                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1208                ),
1209                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1210            ],
1211            HashMap::new(),
1212        )?;
1213        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1214        Ok(())
1215    }
1216
1217    #[test]
1218    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1219        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1220        let arrow_schema: Schema = schema.into();
1221        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
1222        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
1223        assert_eq!(expected, arrow_schema.to_string());
1224        Ok(())
1225    }
1226
1227    #[test]
1228    fn join_qualified() -> Result<()> {
1229        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1230        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1231        let join = left.join(&right)?;
1232        assert_eq!(
1233            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1234            join.to_string()
1235        );
1236        // test valid access
1237        assert!(join
1238            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1239            .is_ok());
1240        assert!(join
1241            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1242            .is_ok());
1243        // test invalid access
1244        assert!(join.field_with_unqualified_name("c0").is_err());
1245        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1246        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1247        Ok(())
1248    }
1249
1250    #[test]
1251    fn join_qualified_duplicate() -> Result<()> {
1252        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1253        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1254        let join = left.join(&right);
1255        assert_eq!(
1256            join.unwrap_err().strip_backtrace(),
1257            "Schema error: Schema contains duplicate qualified field name t1.c0",
1258        );
1259        Ok(())
1260    }
1261
1262    #[test]
1263    fn join_unqualified_duplicate() -> Result<()> {
1264        let left = DFSchema::try_from(test_schema_1())?;
1265        let right = DFSchema::try_from(test_schema_1())?;
1266        let join = left.join(&right);
1267        assert_eq!(
1268            join.unwrap_err().strip_backtrace(),
1269            "Schema error: Schema contains duplicate unqualified field name c0"
1270        );
1271        Ok(())
1272    }
1273
1274    #[test]
1275    fn join_mixed() -> Result<()> {
1276        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1277        let right = DFSchema::try_from(test_schema_2())?;
1278        let join = left.join(&right)?;
1279        assert_eq!(
1280            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1281            join.to_string()
1282        );
1283        // test valid access
1284        assert!(join
1285            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1286            .is_ok());
1287        assert!(join.field_with_unqualified_name("c0").is_ok());
1288        assert!(join.field_with_unqualified_name("c100").is_ok());
1289        assert!(join.field_with_name(None, "c100").is_ok());
1290        // test invalid access
1291        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1292        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1293        assert!(join
1294            .field_with_qualified_name(&TableReference::bare(""), "c100")
1295            .is_err());
1296        Ok(())
1297    }
1298
1299    #[test]
1300    fn join_mixed_duplicate() -> Result<()> {
1301        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1302        let right = DFSchema::try_from(test_schema_1())?;
1303        let join = left.join(&right);
1304        assert_contains!(join.unwrap_err().to_string(),
1305                         "Schema error: Schema contains qualified \
1306                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1307        Ok(())
1308    }
1309
1310    #[test]
1311    fn helpful_error_messages() -> Result<()> {
1312        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1313        let expected_help = "Valid fields are t1.c0, t1.c1.";
1314        assert_contains!(
1315            schema
1316                .field_with_qualified_name(&TableReference::bare("x"), "y")
1317                .unwrap_err()
1318                .to_string(),
1319            expected_help
1320        );
1321        assert_contains!(
1322            schema
1323                .field_with_unqualified_name("y")
1324                .unwrap_err()
1325                .to_string(),
1326            expected_help
1327        );
1328        assert!(schema.index_of_column_by_name(None, "y").is_none());
1329        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1330
1331        Ok(())
1332    }
1333
1334    #[test]
1335    fn select_without_valid_fields() {
1336        let schema = DFSchema::empty();
1337
1338        let col = Column::from_qualified_name("t1.c0");
1339        let err = schema.index_of_column(&col).unwrap_err();
1340        let expected = "Schema error: No field named t1.c0.";
1341        assert_eq!(err.strip_backtrace(), expected);
1342
1343        // the same check without qualifier
1344        let col = Column::from_name("c0");
1345        let err = schema.index_of_column(&col).err().unwrap();
1346        let expected = "Schema error: No field named c0.";
1347        assert_eq!(err.strip_backtrace(), expected);
1348    }
1349
1350    #[test]
1351    fn into() {
1352        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1353        let arrow_schema = Schema::new_with_metadata(
1354            vec![Field::new("c0", DataType::Int64, true)],
1355            test_metadata(),
1356        );
1357        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1358
1359        let df_schema = DFSchema {
1360            inner: Arc::clone(&arrow_schema_ref),
1361            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1362            functional_dependencies: FunctionalDependencies::empty(),
1363        };
1364        let df_schema_ref = Arc::new(df_schema.clone());
1365
1366        {
1367            let arrow_schema = arrow_schema.clone();
1368            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1369
1370            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1371            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1372        }
1373
1374        {
1375            let arrow_schema = arrow_schema.clone();
1376            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1377
1378            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1379            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1380        }
1381
1382        // Now, consume the refs
1383        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1384        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1385    }
1386
1387    fn test_schema_1() -> Schema {
1388        Schema::new(vec![
1389            Field::new("c0", DataType::Boolean, true),
1390            Field::new("c1", DataType::Boolean, true),
1391        ])
1392    }
1393    #[test]
1394    fn test_dfschema_to_schema_conversion() {
1395        let mut a_metadata = HashMap::new();
1396        a_metadata.insert("key".to_string(), "value".to_string());
1397        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1398
1399        let mut b_metadata = HashMap::new();
1400        b_metadata.insert("key".to_string(), "value".to_string());
1401        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1402
1403        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1404
1405        let df_schema = DFSchema {
1406            inner: Arc::clone(&schema),
1407            field_qualifiers: vec![None; schema.fields.len()],
1408            functional_dependencies: FunctionalDependencies::empty(),
1409        };
1410
1411        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1412    }
1413
1414    #[test]
1415    fn test_contain_column() -> Result<()> {
1416        // qualified exists
1417        {
1418            let col = Column::from_qualified_name("t1.c0");
1419            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1420            assert!(schema.is_column_from_schema(&col));
1421        }
1422
1423        // qualified not exists
1424        {
1425            let col = Column::from_qualified_name("t1.c2");
1426            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1427            assert!(!schema.is_column_from_schema(&col));
1428        }
1429
1430        // unqualified exists
1431        {
1432            let col = Column::from_name("c0");
1433            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1434            assert!(schema.is_column_from_schema(&col));
1435        }
1436
1437        // unqualified not exists
1438        {
1439            let col = Column::from_name("c2");
1440            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1441            assert!(!schema.is_column_from_schema(&col));
1442        }
1443
1444        Ok(())
1445    }
1446
1447    #[test]
1448    fn test_datatype_is_logically_equal() {
1449        assert!(DFSchema::datatype_is_logically_equal(
1450            &DataType::Int8,
1451            &DataType::Int8
1452        ));
1453
1454        assert!(!DFSchema::datatype_is_logically_equal(
1455            &DataType::Int8,
1456            &DataType::Int16
1457        ));
1458
1459        // Test lists
1460
1461        // Succeeds if both have the same element type, disregards names and nullability
1462        assert!(DFSchema::datatype_is_logically_equal(
1463            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1464            &DataType::List(Field::new("element", DataType::Int8, false).into())
1465        ));
1466
1467        // Fails if element type is different
1468        assert!(!DFSchema::datatype_is_logically_equal(
1469            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1470            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1471        ));
1472
1473        // Test maps
1474        let map_field = DataType::Map(
1475            Field::new(
1476                "entries",
1477                DataType::Struct(Fields::from(vec![
1478                    Field::new("key", DataType::Int8, false),
1479                    Field::new("value", DataType::Int8, true),
1480                ])),
1481                true,
1482            )
1483            .into(),
1484            true,
1485        );
1486
1487        // Succeeds if both maps have the same key and value types, disregards names and nullability
1488        assert!(DFSchema::datatype_is_logically_equal(
1489            &map_field,
1490            &DataType::Map(
1491                Field::new(
1492                    "pairs",
1493                    DataType::Struct(Fields::from(vec![
1494                        Field::new("one", DataType::Int8, false),
1495                        Field::new("two", DataType::Int8, false)
1496                    ])),
1497                    true
1498                )
1499                .into(),
1500                true
1501            )
1502        ));
1503        // Fails if value type is different
1504        assert!(!DFSchema::datatype_is_logically_equal(
1505            &map_field,
1506            &DataType::Map(
1507                Field::new(
1508                    "entries",
1509                    DataType::Struct(Fields::from(vec![
1510                        Field::new("key", DataType::Int8, false),
1511                        Field::new("value", DataType::Int16, true)
1512                    ])),
1513                    true
1514                )
1515                .into(),
1516                true
1517            )
1518        ));
1519
1520        // Fails if key type is different
1521        assert!(!DFSchema::datatype_is_logically_equal(
1522            &map_field,
1523            &DataType::Map(
1524                Field::new(
1525                    "entries",
1526                    DataType::Struct(Fields::from(vec![
1527                        Field::new("key", DataType::Int16, false),
1528                        Field::new("value", DataType::Int8, true)
1529                    ])),
1530                    true
1531                )
1532                .into(),
1533                true
1534            )
1535        ));
1536
1537        // Test structs
1538
1539        let struct_field = DataType::Struct(Fields::from(vec![
1540            Field::new("a", DataType::Int8, true),
1541            Field::new("b", DataType::Int8, true),
1542        ]));
1543
1544        // Succeeds if both have same names and datatypes, ignores nullability
1545        assert!(DFSchema::datatype_is_logically_equal(
1546            &struct_field,
1547            &DataType::Struct(Fields::from(vec![
1548                Field::new("a", DataType::Int8, false),
1549                Field::new("b", DataType::Int8, true),
1550            ]))
1551        ));
1552
1553        // Fails if field names are different
1554        assert!(!DFSchema::datatype_is_logically_equal(
1555            &struct_field,
1556            &DataType::Struct(Fields::from(vec![
1557                Field::new("x", DataType::Int8, true),
1558                Field::new("y", DataType::Int8, true),
1559            ]))
1560        ));
1561
1562        // Fails if types are different
1563        assert!(!DFSchema::datatype_is_logically_equal(
1564            &struct_field,
1565            &DataType::Struct(Fields::from(vec![
1566                Field::new("a", DataType::Int16, true),
1567                Field::new("b", DataType::Int8, true),
1568            ]))
1569        ));
1570
1571        // Fails if more or less fields
1572        assert!(!DFSchema::datatype_is_logically_equal(
1573            &struct_field,
1574            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1575        ));
1576    }
1577
1578    #[test]
1579    fn test_datatype_is_logically_equivalent_to_dictionary() {
1580        // Dictionary is logically equal to its value type
1581        assert!(DFSchema::datatype_is_logically_equal(
1582            &DataType::Utf8,
1583            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1584        ));
1585    }
1586
1587    #[test]
1588    fn test_datatype_is_semantically_equal() {
1589        assert!(DFSchema::datatype_is_semantically_equal(
1590            &DataType::Int8,
1591            &DataType::Int8
1592        ));
1593
1594        assert!(!DFSchema::datatype_is_semantically_equal(
1595            &DataType::Int8,
1596            &DataType::Int16
1597        ));
1598
1599        // Test lists
1600
1601        // Succeeds if both have the same element type, disregards names and nullability
1602        assert!(DFSchema::datatype_is_semantically_equal(
1603            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1604            &DataType::List(Field::new("element", DataType::Int8, false).into())
1605        ));
1606
1607        // Fails if element type is different
1608        assert!(!DFSchema::datatype_is_semantically_equal(
1609            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1610            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1611        ));
1612
1613        // Test maps
1614        let map_field = DataType::Map(
1615            Field::new(
1616                "entries",
1617                DataType::Struct(Fields::from(vec![
1618                    Field::new("key", DataType::Int8, false),
1619                    Field::new("value", DataType::Int8, true),
1620                ])),
1621                true,
1622            )
1623            .into(),
1624            true,
1625        );
1626
1627        // Succeeds if both maps have the same key and value types, disregards names and nullability
1628        assert!(DFSchema::datatype_is_semantically_equal(
1629            &map_field,
1630            &DataType::Map(
1631                Field::new(
1632                    "pairs",
1633                    DataType::Struct(Fields::from(vec![
1634                        Field::new("one", DataType::Int8, false),
1635                        Field::new("two", DataType::Int8, false)
1636                    ])),
1637                    true
1638                )
1639                .into(),
1640                true
1641            )
1642        ));
1643        // Fails if value type is different
1644        assert!(!DFSchema::datatype_is_semantically_equal(
1645            &map_field,
1646            &DataType::Map(
1647                Field::new(
1648                    "entries",
1649                    DataType::Struct(Fields::from(vec![
1650                        Field::new("key", DataType::Int8, false),
1651                        Field::new("value", DataType::Int16, true)
1652                    ])),
1653                    true
1654                )
1655                .into(),
1656                true
1657            )
1658        ));
1659
1660        // Fails if key type is different
1661        assert!(!DFSchema::datatype_is_semantically_equal(
1662            &map_field,
1663            &DataType::Map(
1664                Field::new(
1665                    "entries",
1666                    DataType::Struct(Fields::from(vec![
1667                        Field::new("key", DataType::Int16, false),
1668                        Field::new("value", DataType::Int8, true)
1669                    ])),
1670                    true
1671                )
1672                .into(),
1673                true
1674            )
1675        ));
1676
1677        // Test structs
1678
1679        let struct_field = DataType::Struct(Fields::from(vec![
1680            Field::new("a", DataType::Int8, true),
1681            Field::new("b", DataType::Int8, true),
1682        ]));
1683
1684        // Succeeds if both have same names and datatypes, ignores nullability
1685        assert!(DFSchema::datatype_is_logically_equal(
1686            &struct_field,
1687            &DataType::Struct(Fields::from(vec![
1688                Field::new("a", DataType::Int8, false),
1689                Field::new("b", DataType::Int8, true),
1690            ]))
1691        ));
1692
1693        // Fails if field names are different
1694        assert!(!DFSchema::datatype_is_logically_equal(
1695            &struct_field,
1696            &DataType::Struct(Fields::from(vec![
1697                Field::new("x", DataType::Int8, true),
1698                Field::new("y", DataType::Int8, true),
1699            ]))
1700        ));
1701
1702        // Fails if types are different
1703        assert!(!DFSchema::datatype_is_logically_equal(
1704            &struct_field,
1705            &DataType::Struct(Fields::from(vec![
1706                Field::new("a", DataType::Int16, true),
1707                Field::new("b", DataType::Int8, true),
1708            ]))
1709        ));
1710
1711        // Fails if more or less fields
1712        assert!(!DFSchema::datatype_is_logically_equal(
1713            &struct_field,
1714            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1715        ));
1716    }
1717
1718    #[test]
1719    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1720        // Dictionary is not semantically equal to its value type
1721        assert!(!DFSchema::datatype_is_semantically_equal(
1722            &DataType::Utf8,
1723            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1724        ));
1725    }
1726
1727    fn test_schema_2() -> Schema {
1728        Schema::new(vec![
1729            Field::new("c100", DataType::Boolean, true),
1730            Field::new("c101", DataType::Boolean, true),
1731        ])
1732    }
1733
1734    fn test_metadata() -> HashMap<String, String> {
1735        test_metadata_n(2)
1736    }
1737
1738    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1739        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1740    }
1741}