1use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{_plan_err, _schema_err, DataFusionError, Result};
27use crate::{
28 Column, FunctionalDependencies, SchemaError, TableReference, field_not_found,
29 unqualified_field_not_found,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34 DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37pub type DFSchemaRef = Arc<DFSchema>;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct DFSchema {
113 inner: SchemaRef,
115 field_qualifiers: Vec<Option<TableReference>>,
118 functional_dependencies: FunctionalDependencies,
120}
121
122impl DFSchema {
123 pub fn empty() -> Self {
125 Self {
126 inner: Arc::new(Schema::new([])),
127 field_qualifiers: vec![],
128 functional_dependencies: FunctionalDependencies::empty(),
129 }
130 }
131
132 pub fn as_arrow(&self) -> &Schema {
136 self.inner.as_ref()
137 }
138
139 pub fn inner(&self) -> &SchemaRef {
143 &self.inner
144 }
145
146 pub fn new_with_metadata(
148 qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
149 metadata: HashMap<String, String>,
150 ) -> Result<Self> {
151 let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
152 qualified_fields.into_iter().unzip();
153
154 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
155
156 let dfschema = Self {
157 inner: schema,
158 field_qualifiers: qualifiers,
159 functional_dependencies: FunctionalDependencies::empty(),
160 };
161 dfschema.check_names()?;
162 Ok(dfschema)
163 }
164
165 pub fn from_unqualified_fields(
167 fields: Fields,
168 metadata: HashMap<String, String>,
169 ) -> Result<Self> {
170 let field_count = fields.len();
171 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
172 let dfschema = Self {
173 inner: schema,
174 field_qualifiers: vec![None; field_count],
175 functional_dependencies: FunctionalDependencies::empty(),
176 };
177 dfschema.check_names()?;
178 Ok(dfschema)
179 }
180
181 pub fn try_from_qualified_schema(
186 qualifier: impl Into<TableReference>,
187 schema: &Schema,
188 ) -> Result<Self> {
189 let qualifier = qualifier.into();
190 let schema = DFSchema {
191 inner: schema.clone().into(),
192 field_qualifiers: vec![Some(qualifier); schema.fields.len()],
193 functional_dependencies: FunctionalDependencies::empty(),
194 };
195 schema.check_names()?;
196 Ok(schema)
197 }
198
199 pub fn from_field_specific_qualified_schema(
201 qualifiers: Vec<Option<TableReference>>,
202 schema: &SchemaRef,
203 ) -> Result<Self> {
204 let dfschema = Self {
205 inner: Arc::clone(schema),
206 field_qualifiers: qualifiers,
207 functional_dependencies: FunctionalDependencies::empty(),
208 };
209 dfschema.check_names()?;
210 Ok(dfschema)
211 }
212
213 pub fn with_field_specific_qualified_schema(
215 &self,
216 qualifiers: Vec<Option<TableReference>>,
217 ) -> Result<Self> {
218 if qualifiers.len() != self.fields().len() {
219 return _plan_err!(
220 "Number of qualifiers must match number of fields. Expected {}, got {}",
221 self.fields().len(),
222 qualifiers.len()
223 );
224 }
225 Ok(DFSchema {
226 inner: Arc::clone(&self.inner),
227 field_qualifiers: qualifiers,
228 functional_dependencies: self.functional_dependencies.clone(),
229 })
230 }
231
232 pub fn check_names(&self) -> Result<()> {
234 let mut qualified_names = BTreeSet::new();
235 let mut unqualified_names = BTreeSet::new();
236
237 for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
238 if let Some(qualifier) = qualifier {
239 if !qualified_names.insert((qualifier, field.name())) {
240 return _schema_err!(SchemaError::DuplicateQualifiedField {
241 qualifier: Box::new(qualifier.clone()),
242 name: field.name().to_string(),
243 });
244 }
245 } else if !unqualified_names.insert(field.name()) {
246 return _schema_err!(SchemaError::DuplicateUnqualifiedField {
247 name: field.name().to_string()
248 });
249 }
250 }
251
252 for (qualifier, name) in qualified_names {
253 if unqualified_names.contains(name) {
254 return _schema_err!(SchemaError::AmbiguousReference {
255 field: Box::new(Column::new(Some(qualifier.clone()), name))
256 });
257 }
258 }
259 Ok(())
260 }
261
262 pub fn with_functional_dependencies(
264 mut self,
265 functional_dependencies: FunctionalDependencies,
266 ) -> Result<Self> {
267 if functional_dependencies.is_valid(self.inner.fields.len()) {
268 self.functional_dependencies = functional_dependencies;
269 Ok(self)
270 } else {
271 _plan_err!(
272 "Invalid functional dependency: {:?}",
273 functional_dependencies
274 )
275 }
276 }
277
278 pub fn join(&self, schema: &DFSchema) -> Result<Self> {
281 let mut schema_builder = SchemaBuilder::new();
282 schema_builder.extend(self.inner.fields().iter().cloned());
283 schema_builder.extend(schema.fields().iter().cloned());
284 let new_schema = schema_builder.finish();
285
286 let mut new_metadata = self.inner.metadata.clone();
287 new_metadata.extend(schema.inner.metadata.clone());
288 let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
289
290 let mut new_qualifiers = self.field_qualifiers.clone();
291 new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
292
293 let new_self = Self {
294 inner: Arc::new(new_schema_with_metadata),
295 field_qualifiers: new_qualifiers,
296 functional_dependencies: FunctionalDependencies::empty(),
297 };
298 new_self.check_names()?;
299 Ok(new_self)
300 }
301
302 pub fn merge(&mut self, other_schema: &DFSchema) {
319 if other_schema.inner.fields.is_empty() {
320 return;
321 }
322
323 let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
324 self.iter().collect();
325 let self_unqualified_names: HashSet<&str> = self
326 .inner
327 .fields
328 .iter()
329 .map(|field| field.name().as_str())
330 .collect();
331
332 let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
333 let mut qualifiers = Vec::new();
334 for (qualifier, field) in other_schema.iter() {
335 let duplicated_field = match qualifier {
337 Some(q) => self_fields.contains(&(Some(q), field)),
338 None => self_unqualified_names.contains(field.name().as_str()),
340 };
341 if !duplicated_field {
342 schema_builder.push(Arc::clone(field));
343 qualifiers.push(qualifier.cloned());
344 }
345 }
346 let mut metadata = self.inner.metadata.clone();
347 metadata.extend(other_schema.inner.metadata.clone());
348
349 let finished = schema_builder.finish();
350 let finished_with_metadata = finished.with_metadata(metadata);
351 self.inner = finished_with_metadata.into();
352 self.field_qualifiers.extend(qualifiers);
353 }
354
355 pub fn fields(&self) -> &Fields {
357 &self.inner.fields
358 }
359
360 pub fn field(&self, i: usize) -> &FieldRef {
365 &self.inner.fields[i]
366 }
367
368 pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &FieldRef) {
371 (self.field_qualifiers[i].as_ref(), self.field(i))
372 }
373
374 pub fn index_of_column_by_name(
375 &self,
376 qualifier: Option<&TableReference>,
377 name: &str,
378 ) -> Option<usize> {
379 let mut matches = self
380 .iter()
381 .enumerate()
382 .filter(|(_, (q, f))| match (qualifier, q) {
383 (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
387 (Some(_), None) => false,
389 (None, Some(_)) | (None, None) => f.name() == name,
391 })
392 .map(|(idx, _)| idx);
393 matches.next()
394 }
395
396 pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
402 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
403 }
404
405 pub fn index_of_column(&self, col: &Column) -> Result<usize> {
411 self.maybe_index_of_column(col)
412 .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
413 }
414
415 pub fn is_column_from_schema(&self, col: &Column) -> bool {
417 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
418 .is_some()
419 }
420
421 pub fn field_with_name(
423 &self,
424 qualifier: Option<&TableReference>,
425 name: &str,
426 ) -> Result<&FieldRef> {
427 if let Some(qualifier) = qualifier {
428 self.field_with_qualified_name(qualifier, name)
429 } else {
430 self.field_with_unqualified_name(name)
431 }
432 }
433
434 pub fn qualified_field_with_name(
436 &self,
437 qualifier: Option<&TableReference>,
438 name: &str,
439 ) -> Result<(Option<&TableReference>, &FieldRef)> {
440 if let Some(qualifier) = qualifier {
441 let idx = self
442 .index_of_column_by_name(Some(qualifier), name)
443 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
444 Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
445 } else {
446 self.qualified_field_with_unqualified_name(name)
447 }
448 }
449
450 pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&FieldRef> {
452 self.iter()
453 .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
454 .map(|(_, f)| f)
455 .collect()
456 }
457
458 pub fn fields_indices_with_qualified(
460 &self,
461 qualifier: &TableReference,
462 ) -> Vec<usize> {
463 self.iter()
464 .enumerate()
465 .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
466 .collect()
467 }
468
469 pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&FieldRef> {
471 self.fields()
472 .iter()
473 .filter(|field| field.name() == name)
474 .collect()
475 }
476
477 pub fn qualified_fields_with_unqualified_name(
479 &self,
480 name: &str,
481 ) -> Vec<(Option<&TableReference>, &FieldRef)> {
482 self.iter()
483 .filter(|(_, field)| field.name() == name)
484 .collect()
485 }
486
487 pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
489 self.iter()
490 .filter(|(_, field)| field.name() == name)
491 .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
492 .collect()
493 }
494
495 pub fn columns(&self) -> Vec<Column> {
497 self.iter()
498 .map(|(qualifier, field)| {
499 Column::new(qualifier.cloned(), field.name().clone())
500 })
501 .collect()
502 }
503
504 pub fn qualified_field_with_unqualified_name(
506 &self,
507 name: &str,
508 ) -> Result<(Option<&TableReference>, &FieldRef)> {
509 let matches = self.qualified_fields_with_unqualified_name(name);
510 match matches.len() {
511 0 => Err(unqualified_field_not_found(name, self)),
512 1 => Ok((matches[0].0, matches[0].1)),
513 _ => {
514 let fields_without_qualifier = matches
522 .iter()
523 .filter(|(q, _)| q.is_none())
524 .collect::<Vec<_>>();
525 if fields_without_qualifier.len() == 1 {
526 Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
527 } else {
528 _schema_err!(SchemaError::AmbiguousReference {
529 field: Box::new(Column::new_unqualified(name.to_string()))
530 })
531 }
532 }
533 }
534 }
535
536 pub fn field_with_unqualified_name(&self, name: &str) -> Result<&FieldRef> {
538 self.qualified_field_with_unqualified_name(name)
539 .map(|(_, field)| field)
540 }
541
542 pub fn field_with_qualified_name(
544 &self,
545 qualifier: &TableReference,
546 name: &str,
547 ) -> Result<&FieldRef> {
548 let idx = self
549 .index_of_column_by_name(Some(qualifier), name)
550 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
551
552 Ok(self.field(idx))
553 }
554
555 pub fn qualified_field_from_column(
557 &self,
558 column: &Column,
559 ) -> Result<(Option<&TableReference>, &FieldRef)> {
560 self.qualified_field_with_name(column.relation.as_ref(), &column.name)
561 }
562
563 pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
565 self.fields().iter().any(|field| field.name() == name)
566 }
567
568 pub fn has_column_with_qualified_name(
570 &self,
571 qualifier: &TableReference,
572 name: &str,
573 ) -> bool {
574 self.iter()
575 .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
576 }
577
578 pub fn has_column(&self, column: &Column) -> bool {
580 match &column.relation {
581 Some(r) => self.has_column_with_qualified_name(r, &column.name),
582 None => self.has_column_with_unqualified_name(&column.name),
583 }
584 }
585
586 pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
588 self.inner
589 .fields
590 .iter()
591 .zip(arrow_schema.fields().iter())
592 .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
593 }
594
595 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
597 pub fn check_arrow_schema_type_compatible(
598 &self,
599 arrow_schema: &Schema,
600 ) -> Result<()> {
601 let self_arrow_schema = self.as_arrow();
602 self_arrow_schema
603 .fields()
604 .iter()
605 .zip(arrow_schema.fields().iter())
606 .try_for_each(|(l_field, r_field)| {
607 if !can_cast_types(r_field.data_type(), l_field.data_type()) {
608 _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
609 r_field.name(),
610 r_field.data_type(),
611 l_field.name(),
612 l_field.data_type())
613 } else {
614 Ok(())
615 }
616 })
617 }
618
619 pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
625 if self.fields().len() != other.fields().len() {
626 return false;
627 }
628 let self_fields = self.iter();
629 let other_fields = other.iter();
630 self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
631 q1 == q2
632 && f1.name() == f2.name()
633 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
634 })
635 }
636
637 #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
638 pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
639 self.has_equivalent_names_and_types(other).is_ok()
640 }
641
642 pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
654 if self.fields().len() != other.fields().len() {
656 _plan_err!(
657 "Schema mismatch: the schema length are not same \
658 Expected schema length: {}, got: {}",
659 self.fields().len(),
660 other.fields().len()
661 )
662 } else {
663 self.fields()
666 .iter()
667 .zip(other.fields().iter())
668 .try_for_each(|(f1, f2)| {
669 if f1.name() != f2.name()
670 || (!DFSchema::datatype_is_semantically_equal(
671 f1.data_type(),
672 f2.data_type(),
673 ))
674 {
675 _plan_err!(
676 "Schema mismatch: Expected field '{}' with type {}, \
677 but got '{}' with type {}.",
678 f1.name(),
679 f1.data_type(),
680 f2.name(),
681 f2.data_type()
682 )
683 } else {
684 Ok(())
685 }
686 })
687 }
688 }
689
690 pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
698 match (dt1, dt2) {
700 (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
701 v1.as_ref() == v2.as_ref()
702 }
703 (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
704 (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
705 (DataType::List(f1), DataType::List(f2))
706 | (DataType::LargeList(f1), DataType::LargeList(f2))
707 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
708 Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
711 }
712 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
713 match (f1.data_type(), f2.data_type()) {
716 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
717 f1_inner.len() == f2_inner.len()
718 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
719 Self::datatype_is_logically_equal(
720 f1.data_type(),
721 f2.data_type(),
722 )
723 })
724 }
725 _ => panic!("Map type should have an inner struct field"),
726 }
727 }
728 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
729 let iter1 = fields1.iter();
730 let iter2 = fields2.iter();
731 fields1.len() == fields2.len() &&
732 iter1
734 .zip(iter2)
735 .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
736 }
737 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
738 let iter1 = fields1.iter();
739 let iter2 = fields2.iter();
740 fields1.len() == fields2.len() &&
741 iter1
743 .zip(iter2)
744 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
745 }
746 (DataType::Utf8, DataType::Utf8View) => true,
748 (DataType::Utf8View, DataType::Utf8) => true,
749 _ => Self::datatype_is_semantically_equal(dt1, dt2),
750 }
751 }
752
753 pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
759 match (dt1, dt2) {
761 (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
762 Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
763 && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
764 }
765 (DataType::List(f1), DataType::List(f2))
766 | (DataType::LargeList(f1), DataType::LargeList(f2))
767 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
768 Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
771 }
772 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
773 match (f1.data_type(), f2.data_type()) {
776 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
777 f1_inner.len() == f2_inner.len()
778 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
779 Self::datatype_is_semantically_equal(
780 f1.data_type(),
781 f2.data_type(),
782 )
783 })
784 }
785 _ => panic!("Map type should have an inner struct field"),
786 }
787 }
788 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
789 let iter1 = fields1.iter();
790 let iter2 = fields2.iter();
791 fields1.len() == fields2.len() &&
792 iter1
794 .zip(iter2)
795 .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
796 }
797 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
798 let iter1 = fields1.iter();
799 let iter2 = fields2.iter();
800 fields1.len() == fields2.len() &&
801 iter1
803 .zip(iter2)
804 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
805 }
806 (
807 DataType::Decimal32(_l_precision, _l_scale),
808 DataType::Decimal32(_r_precision, _r_scale),
809 ) => true,
810 (
811 DataType::Decimal64(_l_precision, _l_scale),
812 DataType::Decimal64(_r_precision, _r_scale),
813 ) => true,
814 (
815 DataType::Decimal128(_l_precision, _l_scale),
816 DataType::Decimal128(_r_precision, _r_scale),
817 ) => true,
818 (
819 DataType::Decimal256(_l_precision, _l_scale),
820 DataType::Decimal256(_r_precision, _r_scale),
821 ) => true,
822 (
823 DataType::Timestamp(_l_time_unit, _l_timezone),
824 DataType::Timestamp(_r_time_unit, _r_timezone),
825 ) => true,
826 _ => dt1 == dt2,
827 }
828 }
829
830 fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
831 f1.name() == f2.name()
832 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
833 }
834
835 fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
836 f1.name() == f2.name()
837 && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
838 }
839
840 pub fn strip_qualifiers(self) -> Self {
842 DFSchema {
843 field_qualifiers: vec![None; self.inner.fields.len()],
844 inner: self.inner,
845 functional_dependencies: self.functional_dependencies,
846 }
847 }
848
849 pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
851 let qualifier = qualifier.into();
852 DFSchema {
853 field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
854 inner: self.inner,
855 functional_dependencies: self.functional_dependencies,
856 }
857 }
858
859 pub fn field_names(&self) -> Vec<String> {
861 self.iter()
862 .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
863 .collect::<Vec<_>>()
864 }
865
866 pub fn metadata(&self) -> &HashMap<String, String> {
868 &self.inner.metadata
869 }
870
871 pub fn functional_dependencies(&self) -> &FunctionalDependencies {
873 &self.functional_dependencies
874 }
875
876 pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
878 self.field_qualifiers
879 .iter()
880 .zip(self.inner.fields().iter())
881 .map(|(qualifier, field)| (qualifier.as_ref(), field))
882 }
883 pub fn tree_string(&self) -> impl Display + '_ {
913 let mut result = String::from("root\n");
914
915 for (qualifier, field) in self.iter() {
916 let field_name = match qualifier {
917 Some(q) => format!("{}.{}", q, field.name()),
918 None => field.name().to_string(),
919 };
920
921 format_field_with_indent(
922 &mut result,
923 &field_name,
924 field.data_type(),
925 field.is_nullable(),
926 " ",
927 );
928 }
929
930 if result.ends_with('\n') {
932 result.pop();
933 }
934
935 result
936 }
937}
938
939fn format_field_with_indent(
941 result: &mut String,
942 field_name: &str,
943 data_type: &DataType,
944 nullable: bool,
945 indent: &str,
946) {
947 let nullable_str = nullable.to_string().to_lowercase();
948 let child_indent = format!("{indent}| ");
949
950 match data_type {
951 DataType::List(field) => {
952 result.push_str(&format!(
953 "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
954 ));
955 format_field_with_indent(
956 result,
957 field.name(),
958 field.data_type(),
959 field.is_nullable(),
960 &child_indent,
961 );
962 }
963 DataType::LargeList(field) => {
964 result.push_str(&format!(
965 "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
966 ));
967 format_field_with_indent(
968 result,
969 field.name(),
970 field.data_type(),
971 field.is_nullable(),
972 &child_indent,
973 );
974 }
975 DataType::FixedSizeList(field, _size) => {
976 result.push_str(&format!(
977 "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
978 ));
979 format_field_with_indent(
980 result,
981 field.name(),
982 field.data_type(),
983 field.is_nullable(),
984 &child_indent,
985 );
986 }
987 DataType::Map(field, _) => {
988 result.push_str(&format!(
989 "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
990 ));
991 if let DataType::Struct(inner_fields) = field.data_type()
992 && inner_fields.len() == 2
993 {
994 format_field_with_indent(
995 result,
996 "key",
997 inner_fields[0].data_type(),
998 inner_fields[0].is_nullable(),
999 &child_indent,
1000 );
1001 let value_contains_null = field.is_nullable().to_string().to_lowercase();
1002 match inner_fields[1].data_type() {
1004 DataType::Struct(_)
1005 | DataType::List(_)
1006 | DataType::LargeList(_)
1007 | DataType::FixedSizeList(_, _)
1008 | DataType::Map(_, _) => {
1009 format_field_with_indent(
1010 result,
1011 "value",
1012 inner_fields[1].data_type(),
1013 inner_fields[1].is_nullable(),
1014 &child_indent,
1015 );
1016 }
1017 _ => {
1018 result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1019 format_simple_data_type(inner_fields[1].data_type())));
1020 }
1021 }
1022 }
1023 }
1024 DataType::Struct(fields) => {
1025 result.push_str(&format!(
1026 "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1027 ));
1028 for struct_field in fields {
1029 format_field_with_indent(
1030 result,
1031 struct_field.name(),
1032 struct_field.data_type(),
1033 struct_field.is_nullable(),
1034 &child_indent,
1035 );
1036 }
1037 }
1038 _ => {
1039 let type_str = format_simple_data_type(data_type);
1040 result.push_str(&format!(
1041 "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1042 ));
1043 }
1044 }
1045}
1046
1047fn format_simple_data_type(data_type: &DataType) -> String {
1049 match data_type {
1050 DataType::Boolean => "boolean".to_string(),
1051 DataType::Int8 => "int8".to_string(),
1052 DataType::Int16 => "int16".to_string(),
1053 DataType::Int32 => "int32".to_string(),
1054 DataType::Int64 => "int64".to_string(),
1055 DataType::UInt8 => "uint8".to_string(),
1056 DataType::UInt16 => "uint16".to_string(),
1057 DataType::UInt32 => "uint32".to_string(),
1058 DataType::UInt64 => "uint64".to_string(),
1059 DataType::Float16 => "float16".to_string(),
1060 DataType::Float32 => "float32".to_string(),
1061 DataType::Float64 => "float64".to_string(),
1062 DataType::Utf8 => "utf8".to_string(),
1063 DataType::LargeUtf8 => "large_utf8".to_string(),
1064 DataType::Binary => "binary".to_string(),
1065 DataType::LargeBinary => "large_binary".to_string(),
1066 DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1067 DataType::Date32 => "date32".to_string(),
1068 DataType::Date64 => "date64".to_string(),
1069 DataType::Time32(_) => "time32".to_string(),
1070 DataType::Time64(_) => "time64".to_string(),
1071 DataType::Timestamp(_, tz) => match tz {
1072 Some(tz_str) => format!("timestamp ({tz_str})"),
1073 None => "timestamp".to_string(),
1074 },
1075 DataType::Interval(_) => "interval".to_string(),
1076 DataType::Dictionary(_, value_type) => {
1077 format_simple_data_type(value_type.as_ref())
1078 }
1079 DataType::Decimal32(precision, scale) => {
1080 format!("decimal32({precision}, {scale})")
1081 }
1082 DataType::Decimal64(precision, scale) => {
1083 format!("decimal64({precision}, {scale})")
1084 }
1085 DataType::Decimal128(precision, scale) => {
1086 format!("decimal128({precision}, {scale})")
1087 }
1088 DataType::Decimal256(precision, scale) => {
1089 format!("decimal256({precision}, {scale})")
1090 }
1091 DataType::Null => "null".to_string(),
1092 _ => format!("{data_type}").to_lowercase(),
1093 }
1094}
1095
1096impl AsRef<Schema> for DFSchema {
1098 fn as_ref(&self) -> &Schema {
1099 self.as_arrow()
1100 }
1101}
1102
1103impl AsRef<SchemaRef> for DFSchema {
1106 fn as_ref(&self) -> &SchemaRef {
1107 self.inner()
1108 }
1109}
1110
1111impl TryFrom<Schema> for DFSchema {
1113 type Error = DataFusionError;
1114 fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1115 Self::try_from(Arc::new(schema))
1116 }
1117}
1118
1119impl TryFrom<SchemaRef> for DFSchema {
1120 type Error = DataFusionError;
1121 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1122 let field_count = schema.fields.len();
1123 let dfschema = Self {
1124 inner: schema,
1125 field_qualifiers: vec![None; field_count],
1126 functional_dependencies: FunctionalDependencies::empty(),
1127 };
1128 Ok(dfschema)
1134 }
1135}
1136
1137impl Hash for DFSchema {
1139 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1140 self.inner.fields.hash(state);
1141 self.inner.metadata.len().hash(state); }
1143}
1144
1145pub trait ToDFSchema
1147where
1148 Self: Sized,
1149{
1150 fn to_dfschema(self) -> Result<DFSchema>;
1152
1153 fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1155 Ok(Arc::new(self.to_dfschema()?))
1156 }
1157}
1158
1159impl ToDFSchema for Schema {
1160 fn to_dfschema(self) -> Result<DFSchema> {
1161 DFSchema::try_from(self)
1162 }
1163}
1164
1165impl ToDFSchema for SchemaRef {
1166 fn to_dfschema(self) -> Result<DFSchema> {
1167 DFSchema::try_from(self)
1168 }
1169}
1170
1171impl ToDFSchema for Vec<Field> {
1172 fn to_dfschema(self) -> Result<DFSchema> {
1173 let field_count = self.len();
1174 let schema = Schema {
1175 fields: self.into(),
1176 metadata: HashMap::new(),
1177 };
1178 let dfschema = DFSchema {
1179 inner: schema.into(),
1180 field_qualifiers: vec![None; field_count],
1181 functional_dependencies: FunctionalDependencies::empty(),
1182 };
1183 Ok(dfschema)
1184 }
1185}
1186
1187impl Display for DFSchema {
1188 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1189 write!(
1190 f,
1191 "fields:[{}], metadata:{:?}",
1192 self.iter()
1193 .map(|(q, f)| qualified_name(q, f.name()))
1194 .collect::<Vec<String>>()
1195 .join(", "),
1196 self.inner.metadata
1197 )
1198 }
1199}
1200
1201pub trait ExprSchema: std::fmt::Debug {
1207 fn nullable(&self, col: &Column) -> Result<bool> {
1209 Ok(self.field_from_column(col)?.is_nullable())
1210 }
1211
1212 fn data_type(&self, col: &Column) -> Result<&DataType> {
1214 Ok(self.field_from_column(col)?.data_type())
1215 }
1216
1217 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1219 Ok(self.field_from_column(col)?.metadata())
1220 }
1221
1222 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1224 let field = self.field_from_column(col)?;
1225 Ok((field.data_type(), field.is_nullable()))
1226 }
1227
1228 fn field_from_column(&self, col: &Column) -> Result<&FieldRef>;
1230}
1231
1232impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1234 fn nullable(&self, col: &Column) -> Result<bool> {
1235 self.as_ref().nullable(col)
1236 }
1237
1238 fn data_type(&self, col: &Column) -> Result<&DataType> {
1239 self.as_ref().data_type(col)
1240 }
1241
1242 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1243 ExprSchema::metadata(self.as_ref(), col)
1244 }
1245
1246 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1247 self.as_ref().data_type_and_nullable(col)
1248 }
1249
1250 fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1251 self.as_ref().field_from_column(col)
1252 }
1253}
1254
1255impl ExprSchema for DFSchema {
1256 fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1257 match &col.relation {
1258 Some(r) => self.field_with_qualified_name(r, &col.name),
1259 None => self.field_with_unqualified_name(&col.name),
1260 }
1261 }
1262}
1263
1264pub trait SchemaExt {
1266 fn equivalent_names_and_types(&self, other: &Self) -> bool;
1271
1272 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1280}
1281
1282impl SchemaExt for Schema {
1283 fn equivalent_names_and_types(&self, other: &Self) -> bool {
1284 if self.fields().len() != other.fields().len() {
1285 return false;
1286 }
1287
1288 self.fields()
1289 .iter()
1290 .zip(other.fields().iter())
1291 .all(|(f1, f2)| {
1292 f1.name() == f2.name()
1293 && DFSchema::datatype_is_semantically_equal(
1294 f1.data_type(),
1295 f2.data_type(),
1296 )
1297 })
1298 }
1299
1300 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1302 if self.fields().len() != other.fields().len() {
1304 _plan_err!(
1305 "Inserting query must have the same schema length as the table. \
1306 Expected table schema length: {}, got: {}",
1307 self.fields().len(),
1308 other.fields().len()
1309 )
1310 } else {
1311 self.fields()
1314 .iter()
1315 .zip(other.fields().iter())
1316 .try_for_each(|(f1, f2)| {
1317 if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1318 _plan_err!(
1319 "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1320 but got '{}' with type {}.",
1321 f1.name(),
1322 f1.data_type(),
1323 f2.name(),
1324 f2.data_type())
1325 } else {
1326 Ok(())
1327 }
1328 })
1329 }
1330 }
1331}
1332
1333pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1334 match qualifier {
1335 Some(q) => format!("{q}.{name}"),
1336 None => name.to_string(),
1337 }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342 use crate::assert_contains;
1343
1344 use super::*;
1345
1346 #[test]
1347 fn qualifier_in_name() -> Result<()> {
1348 let col = Column::from_name("t1.c0");
1349 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1350 let err = schema.index_of_column(&col).unwrap_err();
1352 let expected = "Schema error: No field named \"t1.c0\". \
1353 Column names are case sensitive. \
1354 You can use double quotes to refer to the \"\"t1.c0\"\" column \
1355 or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1356 Did you mean 't1.c0'?.";
1357 assert_eq!(err.strip_backtrace(), expected);
1358 Ok(())
1359 }
1360
1361 #[test]
1362 fn quoted_qualifiers_in_name() -> Result<()> {
1363 let col = Column::from_name("t1.c0");
1364 let schema = DFSchema::try_from_qualified_schema(
1365 "t1",
1366 &Schema::new(vec![
1367 Field::new("CapitalColumn", DataType::Boolean, true),
1368 Field::new("field.with.period", DataType::Boolean, true),
1369 ]),
1370 )?;
1371
1372 let err = schema.index_of_column(&col).unwrap_err();
1374 let expected = "Schema error: No field named \"t1.c0\". \
1375 Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1376 assert_eq!(err.strip_backtrace(), expected);
1377 Ok(())
1378 }
1379
1380 #[test]
1381 fn from_unqualified_schema() -> Result<()> {
1382 let schema = DFSchema::try_from(test_schema_1())?;
1383 assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1384 Ok(())
1385 }
1386
1387 #[test]
1388 fn from_qualified_schema() -> Result<()> {
1389 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1390 assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1391 Ok(())
1392 }
1393
1394 #[test]
1395 fn test_from_field_specific_qualified_schema() -> Result<()> {
1396 let schema = DFSchema::from_field_specific_qualified_schema(
1397 vec![Some("t1".into()), None],
1398 &Arc::new(Schema::new(vec![
1399 Field::new("c0", DataType::Boolean, true),
1400 Field::new("c1", DataType::Boolean, true),
1401 ])),
1402 )?;
1403 assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1404 Ok(())
1405 }
1406
1407 #[test]
1408 fn test_from_qualified_fields() -> Result<()> {
1409 let schema = DFSchema::new_with_metadata(
1410 vec![
1411 (
1412 Some("t0".into()),
1413 Arc::new(Field::new("c0", DataType::Boolean, true)),
1414 ),
1415 (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1416 ],
1417 HashMap::new(),
1418 )?;
1419 assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1420 Ok(())
1421 }
1422
1423 #[test]
1424 fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1425 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1426 let arrow_schema = schema.as_arrow();
1427 insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1428 Ok(())
1429 }
1430
1431 #[test]
1432 fn join_qualified() -> Result<()> {
1433 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1434 let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1435 let join = left.join(&right)?;
1436 assert_eq!(
1437 "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1438 join.to_string()
1439 );
1440 assert!(
1442 join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1443 .is_ok()
1444 );
1445 assert!(
1446 join.field_with_qualified_name(&TableReference::bare("t2"), "c0")
1447 .is_ok()
1448 );
1449 assert!(join.field_with_unqualified_name("c0").is_err());
1451 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1452 assert!(join.field_with_unqualified_name("t2.c0").is_err());
1453 Ok(())
1454 }
1455
1456 #[test]
1457 fn join_qualified_duplicate() -> Result<()> {
1458 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1459 let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1460 let join = left.join(&right);
1461 assert_eq!(
1462 join.unwrap_err().strip_backtrace(),
1463 "Schema error: Schema contains duplicate qualified field name t1.c0",
1464 );
1465 Ok(())
1466 }
1467
1468 #[test]
1469 fn join_unqualified_duplicate() -> Result<()> {
1470 let left = DFSchema::try_from(test_schema_1())?;
1471 let right = DFSchema::try_from(test_schema_1())?;
1472 let join = left.join(&right);
1473 assert_eq!(
1474 join.unwrap_err().strip_backtrace(),
1475 "Schema error: Schema contains duplicate unqualified field name c0"
1476 );
1477 Ok(())
1478 }
1479
1480 #[test]
1481 fn join_mixed() -> Result<()> {
1482 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1483 let right = DFSchema::try_from(test_schema_2())?;
1484 let join = left.join(&right)?;
1485 assert_eq!(
1486 "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1487 join.to_string()
1488 );
1489 assert!(
1491 join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1492 .is_ok()
1493 );
1494 assert!(join.field_with_unqualified_name("c0").is_ok());
1495 assert!(join.field_with_unqualified_name("c100").is_ok());
1496 assert!(join.field_with_name(None, "c100").is_ok());
1497 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1499 assert!(join.field_with_unqualified_name("t1.c100").is_err());
1500 assert!(
1501 join.field_with_qualified_name(&TableReference::bare(""), "c100")
1502 .is_err()
1503 );
1504 Ok(())
1505 }
1506
1507 #[test]
1508 fn join_mixed_duplicate() -> Result<()> {
1509 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1510 let right = DFSchema::try_from(test_schema_1())?;
1511 let join = left.join(&right);
1512 assert_contains!(
1513 join.unwrap_err().to_string(),
1514 "Schema error: Schema contains qualified \
1515 field name t1.c0 and unqualified field name c0 which would be ambiguous"
1516 );
1517 Ok(())
1518 }
1519
1520 #[test]
1521 fn helpful_error_messages() -> Result<()> {
1522 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1523 let expected_help = "Valid fields are t1.c0, t1.c1.";
1524 assert_contains!(
1525 schema
1526 .field_with_qualified_name(&TableReference::bare("x"), "y")
1527 .unwrap_err()
1528 .to_string(),
1529 expected_help
1530 );
1531 assert_contains!(
1532 schema
1533 .field_with_unqualified_name("y")
1534 .unwrap_err()
1535 .to_string(),
1536 expected_help
1537 );
1538 assert!(schema.index_of_column_by_name(None, "y").is_none());
1539 assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1540
1541 Ok(())
1542 }
1543
1544 #[test]
1545 fn select_without_valid_fields() {
1546 let schema = DFSchema::empty();
1547
1548 let col = Column::from_qualified_name("t1.c0");
1549 let err = schema.index_of_column(&col).unwrap_err();
1550 let expected = "Schema error: No field named t1.c0.";
1551 assert_eq!(err.strip_backtrace(), expected);
1552
1553 let col = Column::from_name("c0");
1555 let err = schema.index_of_column(&col).err().unwrap();
1556 let expected = "Schema error: No field named c0.";
1557 assert_eq!(err.strip_backtrace(), expected);
1558 }
1559
1560 #[test]
1561 fn into() {
1562 let arrow_schema = Schema::new_with_metadata(
1564 vec![Field::new("c0", DataType::Int64, true)],
1565 test_metadata(),
1566 );
1567 let arrow_schema_ref = Arc::new(arrow_schema.clone());
1568
1569 let df_schema = DFSchema {
1570 inner: Arc::clone(&arrow_schema_ref),
1571 field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1572 functional_dependencies: FunctionalDependencies::empty(),
1573 };
1574 let df_schema_ref = Arc::new(df_schema.clone());
1575
1576 {
1577 let arrow_schema = arrow_schema.clone();
1578 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1579
1580 assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1581 assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1582 }
1583
1584 {
1585 let arrow_schema = arrow_schema.clone();
1586 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1587
1588 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1589 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1590 }
1591
1592 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1594 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1595 }
1596
1597 fn test_schema_1() -> Schema {
1598 Schema::new(vec![
1599 Field::new("c0", DataType::Boolean, true),
1600 Field::new("c1", DataType::Boolean, true),
1601 ])
1602 }
1603 #[test]
1604 fn test_dfschema_to_schema_conversion() {
1605 let mut a_metadata = HashMap::new();
1606 a_metadata.insert("key".to_string(), "value".to_string());
1607 let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1608
1609 let mut b_metadata = HashMap::new();
1610 b_metadata.insert("key".to_string(), "value".to_string());
1611 let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1612
1613 let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1614
1615 let df_schema = DFSchema {
1616 inner: Arc::clone(&schema),
1617 field_qualifiers: vec![None; schema.fields.len()],
1618 functional_dependencies: FunctionalDependencies::empty(),
1619 };
1620
1621 assert_eq!(df_schema.inner.metadata(), schema.metadata())
1622 }
1623
1624 #[test]
1625 fn test_contain_column() -> Result<()> {
1626 {
1628 let col = Column::from_qualified_name("t1.c0");
1629 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1630 assert!(schema.is_column_from_schema(&col));
1631 }
1632
1633 {
1635 let col = Column::from_qualified_name("t1.c2");
1636 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1637 assert!(!schema.is_column_from_schema(&col));
1638 }
1639
1640 {
1642 let col = Column::from_name("c0");
1643 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1644 assert!(schema.is_column_from_schema(&col));
1645 }
1646
1647 {
1649 let col = Column::from_name("c2");
1650 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1651 assert!(!schema.is_column_from_schema(&col));
1652 }
1653
1654 Ok(())
1655 }
1656
1657 #[test]
1658 fn test_datatype_is_logically_equal() {
1659 assert!(DFSchema::datatype_is_logically_equal(
1660 &DataType::Int8,
1661 &DataType::Int8
1662 ));
1663
1664 assert!(!DFSchema::datatype_is_logically_equal(
1665 &DataType::Int8,
1666 &DataType::Int16
1667 ));
1668
1669 assert!(DFSchema::datatype_is_logically_equal(
1673 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1674 &DataType::List(Field::new("element", DataType::Int8, false).into())
1675 ));
1676
1677 assert!(!DFSchema::datatype_is_logically_equal(
1679 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1680 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1681 ));
1682
1683 let map_field = DataType::Map(
1685 Field::new(
1686 "entries",
1687 DataType::Struct(Fields::from(vec![
1688 Field::new("key", DataType::Int8, false),
1689 Field::new("value", DataType::Int8, true),
1690 ])),
1691 true,
1692 )
1693 .into(),
1694 true,
1695 );
1696
1697 assert!(DFSchema::datatype_is_logically_equal(
1699 &map_field,
1700 &DataType::Map(
1701 Field::new(
1702 "pairs",
1703 DataType::Struct(Fields::from(vec![
1704 Field::new("one", DataType::Int8, false),
1705 Field::new("two", DataType::Int8, false)
1706 ])),
1707 true
1708 )
1709 .into(),
1710 true
1711 )
1712 ));
1713 assert!(!DFSchema::datatype_is_logically_equal(
1715 &map_field,
1716 &DataType::Map(
1717 Field::new(
1718 "entries",
1719 DataType::Struct(Fields::from(vec![
1720 Field::new("key", DataType::Int8, false),
1721 Field::new("value", DataType::Int16, true)
1722 ])),
1723 true
1724 )
1725 .into(),
1726 true
1727 )
1728 ));
1729
1730 assert!(!DFSchema::datatype_is_logically_equal(
1732 &map_field,
1733 &DataType::Map(
1734 Field::new(
1735 "entries",
1736 DataType::Struct(Fields::from(vec![
1737 Field::new("key", DataType::Int16, false),
1738 Field::new("value", DataType::Int8, true)
1739 ])),
1740 true
1741 )
1742 .into(),
1743 true
1744 )
1745 ));
1746
1747 let struct_field = DataType::Struct(Fields::from(vec![
1750 Field::new("a", DataType::Int8, true),
1751 Field::new("b", DataType::Int8, true),
1752 ]));
1753
1754 assert!(DFSchema::datatype_is_logically_equal(
1756 &struct_field,
1757 &DataType::Struct(Fields::from(vec![
1758 Field::new("a", DataType::Int8, false),
1759 Field::new("b", DataType::Int8, true),
1760 ]))
1761 ));
1762
1763 assert!(!DFSchema::datatype_is_logically_equal(
1765 &struct_field,
1766 &DataType::Struct(Fields::from(vec![
1767 Field::new("x", DataType::Int8, true),
1768 Field::new("y", DataType::Int8, true),
1769 ]))
1770 ));
1771
1772 assert!(!DFSchema::datatype_is_logically_equal(
1774 &struct_field,
1775 &DataType::Struct(Fields::from(vec![
1776 Field::new("a", DataType::Int16, true),
1777 Field::new("b", DataType::Int8, true),
1778 ]))
1779 ));
1780
1781 assert!(!DFSchema::datatype_is_logically_equal(
1783 &struct_field,
1784 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1785 ));
1786 }
1787
1788 #[test]
1789 fn test_datatype_is_logically_equivalent_to_dictionary() {
1790 assert!(DFSchema::datatype_is_logically_equal(
1792 &DataType::Utf8,
1793 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1794 ));
1795 }
1796
1797 #[test]
1798 fn test_datatype_is_semantically_equal() {
1799 assert!(DFSchema::datatype_is_semantically_equal(
1800 &DataType::Int8,
1801 &DataType::Int8
1802 ));
1803
1804 assert!(!DFSchema::datatype_is_semantically_equal(
1805 &DataType::Int8,
1806 &DataType::Int16
1807 ));
1808
1809 assert!(DFSchema::datatype_is_semantically_equal(
1811 &DataType::Decimal32(1, 2),
1812 &DataType::Decimal32(2, 1),
1813 ));
1814
1815 assert!(DFSchema::datatype_is_semantically_equal(
1816 &DataType::Decimal64(1, 2),
1817 &DataType::Decimal64(2, 1),
1818 ));
1819
1820 assert!(DFSchema::datatype_is_semantically_equal(
1821 &DataType::Decimal128(1, 2),
1822 &DataType::Decimal128(2, 1),
1823 ));
1824
1825 assert!(DFSchema::datatype_is_semantically_equal(
1826 &DataType::Decimal256(1, 2),
1827 &DataType::Decimal256(2, 1),
1828 ));
1829
1830 assert!(DFSchema::datatype_is_semantically_equal(
1832 &DataType::Timestamp(
1833 arrow::datatypes::TimeUnit::Microsecond,
1834 Some("UTC".into())
1835 ),
1836 &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1837 ));
1838
1839 assert!(DFSchema::datatype_is_semantically_equal(
1843 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1844 &DataType::List(Field::new("element", DataType::Int8, false).into())
1845 ));
1846
1847 assert!(!DFSchema::datatype_is_semantically_equal(
1849 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1850 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1851 ));
1852
1853 let map_field = DataType::Map(
1855 Field::new(
1856 "entries",
1857 DataType::Struct(Fields::from(vec![
1858 Field::new("key", DataType::Int8, false),
1859 Field::new("value", DataType::Int8, true),
1860 ])),
1861 true,
1862 )
1863 .into(),
1864 true,
1865 );
1866
1867 assert!(DFSchema::datatype_is_semantically_equal(
1869 &map_field,
1870 &DataType::Map(
1871 Field::new(
1872 "pairs",
1873 DataType::Struct(Fields::from(vec![
1874 Field::new("one", DataType::Int8, false),
1875 Field::new("two", DataType::Int8, false)
1876 ])),
1877 true
1878 )
1879 .into(),
1880 true
1881 )
1882 ));
1883 assert!(!DFSchema::datatype_is_semantically_equal(
1885 &map_field,
1886 &DataType::Map(
1887 Field::new(
1888 "entries",
1889 DataType::Struct(Fields::from(vec![
1890 Field::new("key", DataType::Int8, false),
1891 Field::new("value", DataType::Int16, true)
1892 ])),
1893 true
1894 )
1895 .into(),
1896 true
1897 )
1898 ));
1899
1900 assert!(!DFSchema::datatype_is_semantically_equal(
1902 &map_field,
1903 &DataType::Map(
1904 Field::new(
1905 "entries",
1906 DataType::Struct(Fields::from(vec![
1907 Field::new("key", DataType::Int16, false),
1908 Field::new("value", DataType::Int8, true)
1909 ])),
1910 true
1911 )
1912 .into(),
1913 true
1914 )
1915 ));
1916
1917 let struct_field = DataType::Struct(Fields::from(vec![
1920 Field::new("a", DataType::Int8, true),
1921 Field::new("b", DataType::Int8, true),
1922 ]));
1923
1924 assert!(DFSchema::datatype_is_logically_equal(
1926 &struct_field,
1927 &DataType::Struct(Fields::from(vec![
1928 Field::new("a", DataType::Int8, false),
1929 Field::new("b", DataType::Int8, true),
1930 ]))
1931 ));
1932
1933 assert!(!DFSchema::datatype_is_logically_equal(
1935 &struct_field,
1936 &DataType::Struct(Fields::from(vec![
1937 Field::new("x", DataType::Int8, true),
1938 Field::new("y", DataType::Int8, true),
1939 ]))
1940 ));
1941
1942 assert!(!DFSchema::datatype_is_logically_equal(
1944 &struct_field,
1945 &DataType::Struct(Fields::from(vec![
1946 Field::new("a", DataType::Int16, true),
1947 Field::new("b", DataType::Int8, true),
1948 ]))
1949 ));
1950
1951 assert!(!DFSchema::datatype_is_logically_equal(
1953 &struct_field,
1954 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1955 ));
1956 }
1957
1958 #[test]
1959 fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1960 assert!(!DFSchema::datatype_is_semantically_equal(
1962 &DataType::Utf8,
1963 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1964 ));
1965 }
1966
1967 fn test_schema_2() -> Schema {
1968 Schema::new(vec![
1969 Field::new("c100", DataType::Boolean, true),
1970 Field::new("c101", DataType::Boolean, true),
1971 ])
1972 }
1973
1974 fn test_metadata() -> HashMap<String, String> {
1975 test_metadata_n(2)
1976 }
1977
1978 fn test_metadata_n(n: usize) -> HashMap<String, String> {
1979 (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1980 }
1981
1982 #[test]
1983 fn test_print_schema_unqualified() {
1984 let schema = DFSchema::from_unqualified_fields(
1985 vec![
1986 Field::new("id", DataType::Int32, false),
1987 Field::new("name", DataType::Utf8, true),
1988 Field::new("age", DataType::Int64, true),
1989 Field::new("active", DataType::Boolean, false),
1990 ]
1991 .into(),
1992 HashMap::new(),
1993 )
1994 .unwrap();
1995
1996 let output = schema.tree_string();
1997
1998 insta::assert_snapshot!(output, @r"
1999 root
2000 |-- id: int32 (nullable = false)
2001 |-- name: utf8 (nullable = true)
2002 |-- age: int64 (nullable = true)
2003 |-- active: boolean (nullable = false)
2004 ");
2005 }
2006
2007 #[test]
2008 fn test_print_schema_qualified() {
2009 let schema = DFSchema::try_from_qualified_schema(
2010 "table1",
2011 &Schema::new(vec![
2012 Field::new("id", DataType::Int32, false),
2013 Field::new("name", DataType::Utf8, true),
2014 ]),
2015 )
2016 .unwrap();
2017
2018 let output = schema.tree_string();
2019
2020 insta::assert_snapshot!(output, @r"
2021 root
2022 |-- table1.id: int32 (nullable = false)
2023 |-- table1.name: utf8 (nullable = true)
2024 ");
2025 }
2026
2027 #[test]
2028 fn test_print_schema_complex_types() {
2029 let struct_field = Field::new(
2030 "address",
2031 DataType::Struct(Fields::from(vec![
2032 Field::new("street", DataType::Utf8, true),
2033 Field::new("city", DataType::Utf8, true),
2034 ])),
2035 true,
2036 );
2037
2038 let list_field = Field::new(
2039 "tags",
2040 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2041 true,
2042 );
2043
2044 let schema = DFSchema::from_unqualified_fields(
2045 vec![
2046 Field::new("id", DataType::Int32, false),
2047 struct_field,
2048 list_field,
2049 Field::new("score", DataType::Decimal128(10, 2), true),
2050 ]
2051 .into(),
2052 HashMap::new(),
2053 )
2054 .unwrap();
2055
2056 let output = schema.tree_string();
2057 insta::assert_snapshot!(output, @r"
2058 root
2059 |-- id: int32 (nullable = false)
2060 |-- address: struct (nullable = true)
2061 | |-- street: utf8 (nullable = true)
2062 | |-- city: utf8 (nullable = true)
2063 |-- tags: list (nullable = true)
2064 | |-- item: utf8 (nullable = true)
2065 |-- score: decimal128(10, 2) (nullable = true)
2066 ");
2067 }
2068
2069 #[test]
2070 fn test_print_schema_empty() {
2071 let schema = DFSchema::empty();
2072 let output = schema.tree_string();
2073 insta::assert_snapshot!(output, @"root");
2074 }
2075
2076 #[test]
2077 fn test_print_schema_deeply_nested_types() {
2078 let inner_struct = Field::new(
2080 "inner",
2081 DataType::Struct(Fields::from(vec![
2082 Field::new("level1", DataType::Utf8, true),
2083 Field::new("level2", DataType::Int32, false),
2084 ])),
2085 true,
2086 );
2087
2088 let nested_list = Field::new(
2089 "nested_list",
2090 DataType::List(Arc::new(Field::new(
2091 "item",
2092 DataType::Struct(Fields::from(vec![
2093 Field::new("id", DataType::Int64, false),
2094 Field::new("value", DataType::Float64, true),
2095 ])),
2096 true,
2097 ))),
2098 true,
2099 );
2100
2101 let map_field = Field::new(
2102 "map_data",
2103 DataType::Map(
2104 Arc::new(Field::new(
2105 "entries",
2106 DataType::Struct(Fields::from(vec![
2107 Field::new("key", DataType::Utf8, false),
2108 Field::new(
2109 "value",
2110 DataType::List(Arc::new(Field::new(
2111 "item",
2112 DataType::Int32,
2113 true,
2114 ))),
2115 true,
2116 ),
2117 ])),
2118 false,
2119 )),
2120 false,
2121 ),
2122 true,
2123 );
2124
2125 let schema = DFSchema::from_unqualified_fields(
2126 vec![
2127 Field::new("simple_field", DataType::Utf8, true),
2128 inner_struct,
2129 nested_list,
2130 map_field,
2131 Field::new(
2132 "timestamp_field",
2133 DataType::Timestamp(
2134 arrow::datatypes::TimeUnit::Microsecond,
2135 Some("UTC".into()),
2136 ),
2137 false,
2138 ),
2139 ]
2140 .into(),
2141 HashMap::new(),
2142 )
2143 .unwrap();
2144
2145 let output = schema.tree_string();
2146
2147 insta::assert_snapshot!(output, @r"
2148 root
2149 |-- simple_field: utf8 (nullable = true)
2150 |-- inner: struct (nullable = true)
2151 | |-- level1: utf8 (nullable = true)
2152 | |-- level2: int32 (nullable = false)
2153 |-- nested_list: list (nullable = true)
2154 | |-- item: struct (nullable = true)
2155 | | |-- id: int64 (nullable = false)
2156 | | |-- value: float64 (nullable = true)
2157 |-- map_data: map (nullable = true)
2158 | |-- key: utf8 (nullable = false)
2159 | |-- value: list (nullable = true)
2160 | | |-- item: int32 (nullable = true)
2161 |-- timestamp_field: timestamp (UTC) (nullable = false)
2162 ");
2163 }
2164
2165 #[test]
2166 fn test_print_schema_mixed_qualified_unqualified() {
2167 let schema = DFSchema::new_with_metadata(
2169 vec![
2170 (
2171 Some("table1".into()),
2172 Arc::new(Field::new("id", DataType::Int32, false)),
2173 ),
2174 (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2175 (
2176 Some("table2".into()),
2177 Arc::new(Field::new("score", DataType::Float64, true)),
2178 ),
2179 (
2180 None,
2181 Arc::new(Field::new("active", DataType::Boolean, false)),
2182 ),
2183 ],
2184 HashMap::new(),
2185 )
2186 .unwrap();
2187
2188 let output = schema.tree_string();
2189
2190 insta::assert_snapshot!(output, @r"
2191 root
2192 |-- table1.id: int32 (nullable = false)
2193 |-- name: utf8 (nullable = true)
2194 |-- table2.score: float64 (nullable = true)
2195 |-- active: boolean (nullable = false)
2196 ");
2197 }
2198
2199 #[test]
2200 fn test_print_schema_array_of_map() {
2201 let map_field = Field::new(
2203 "entries",
2204 DataType::Struct(Fields::from(vec![
2205 Field::new("key", DataType::Utf8, false),
2206 Field::new("value", DataType::Utf8, false),
2207 ])),
2208 false,
2209 );
2210
2211 let array_of_map_field = Field::new(
2212 "array_map_field",
2213 DataType::List(Arc::new(Field::new(
2214 "item",
2215 DataType::Map(Arc::new(map_field), false),
2216 false,
2217 ))),
2218 false,
2219 );
2220
2221 let schema = DFSchema::from_unqualified_fields(
2222 vec![array_of_map_field].into(),
2223 HashMap::new(),
2224 )
2225 .unwrap();
2226
2227 let output = schema.tree_string();
2228
2229 insta::assert_snapshot!(output, @r"
2230 root
2231 |-- array_map_field: list (nullable = false)
2232 | |-- item: map (nullable = false)
2233 | | |-- key: utf8 (nullable = false)
2234 | | |-- value: utf8 (nullable = false)
2235 ");
2236 }
2237
2238 #[test]
2239 fn test_print_schema_complex_type_combinations() {
2240 let list_of_structs = Field::new(
2244 "list_of_structs",
2245 DataType::List(Arc::new(Field::new(
2246 "item",
2247 DataType::Struct(Fields::from(vec![
2248 Field::new("id", DataType::Int32, false),
2249 Field::new("name", DataType::Utf8, true),
2250 Field::new("score", DataType::Float64, true),
2251 ])),
2252 true,
2253 ))),
2254 true,
2255 );
2256
2257 let struct_with_lists = Field::new(
2259 "struct_with_lists",
2260 DataType::Struct(Fields::from(vec![
2261 Field::new(
2262 "tags",
2263 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2264 true,
2265 ),
2266 Field::new(
2267 "scores",
2268 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2269 false,
2270 ),
2271 Field::new("metadata", DataType::Utf8, true),
2272 ])),
2273 false,
2274 );
2275
2276 let map_with_struct_values = Field::new(
2278 "map_with_struct_values",
2279 DataType::Map(
2280 Arc::new(Field::new(
2281 "entries",
2282 DataType::Struct(Fields::from(vec![
2283 Field::new("key", DataType::Utf8, false),
2284 Field::new(
2285 "value",
2286 DataType::Struct(Fields::from(vec![
2287 Field::new("count", DataType::Int64, false),
2288 Field::new("active", DataType::Boolean, true),
2289 ])),
2290 true,
2291 ),
2292 ])),
2293 false,
2294 )),
2295 false,
2296 ),
2297 true,
2298 );
2299
2300 let list_of_maps = Field::new(
2302 "list_of_maps",
2303 DataType::List(Arc::new(Field::new(
2304 "item",
2305 DataType::Map(
2306 Arc::new(Field::new(
2307 "entries",
2308 DataType::Struct(Fields::from(vec![
2309 Field::new("key", DataType::Utf8, false),
2310 Field::new("value", DataType::Int32, true),
2311 ])),
2312 false,
2313 )),
2314 false,
2315 ),
2316 true,
2317 ))),
2318 true,
2319 );
2320
2321 let deeply_nested = Field::new(
2323 "deeply_nested",
2324 DataType::Struct(Fields::from(vec![
2325 Field::new("level1", DataType::Utf8, true),
2326 Field::new(
2327 "level2",
2328 DataType::List(Arc::new(Field::new(
2329 "item",
2330 DataType::Struct(Fields::from(vec![
2331 Field::new("id", DataType::Int32, false),
2332 Field::new(
2333 "properties",
2334 DataType::Map(
2335 Arc::new(Field::new(
2336 "entries",
2337 DataType::Struct(Fields::from(vec![
2338 Field::new("key", DataType::Utf8, false),
2339 Field::new("value", DataType::Float64, true),
2340 ])),
2341 false,
2342 )),
2343 false,
2344 ),
2345 true,
2346 ),
2347 ])),
2348 true,
2349 ))),
2350 false,
2351 ),
2352 ])),
2353 true,
2354 );
2355
2356 let schema = DFSchema::from_unqualified_fields(
2357 vec![
2358 list_of_structs,
2359 struct_with_lists,
2360 map_with_struct_values,
2361 list_of_maps,
2362 deeply_nested,
2363 ]
2364 .into(),
2365 HashMap::new(),
2366 )
2367 .unwrap();
2368
2369 let output = schema.tree_string();
2370
2371 insta::assert_snapshot!(output, @r"
2372 root
2373 |-- list_of_structs: list (nullable = true)
2374 | |-- item: struct (nullable = true)
2375 | | |-- id: int32 (nullable = false)
2376 | | |-- name: utf8 (nullable = true)
2377 | | |-- score: float64 (nullable = true)
2378 |-- struct_with_lists: struct (nullable = false)
2379 | |-- tags: list (nullable = true)
2380 | | |-- item: utf8 (nullable = true)
2381 | |-- scores: list (nullable = false)
2382 | | |-- item: int32 (nullable = true)
2383 | |-- metadata: utf8 (nullable = true)
2384 |-- map_with_struct_values: map (nullable = true)
2385 | |-- key: utf8 (nullable = false)
2386 | |-- value: struct (nullable = true)
2387 | | |-- count: int64 (nullable = false)
2388 | | |-- active: boolean (nullable = true)
2389 |-- list_of_maps: list (nullable = true)
2390 | |-- item: map (nullable = true)
2391 | | |-- key: utf8 (nullable = false)
2392 | | |-- value: int32 (nullable = false)
2393 |-- deeply_nested: struct (nullable = true)
2394 | |-- level1: utf8 (nullable = true)
2395 | |-- level2: list (nullable = false)
2396 | | |-- item: struct (nullable = true)
2397 | | | |-- id: int32 (nullable = false)
2398 | | | |-- properties: map (nullable = true)
2399 | | | | |-- key: utf8 (nullable = false)
2400 | | | | |-- value: float64 (nullable = false)
2401 ");
2402 }
2403
2404 #[test]
2405 fn test_print_schema_edge_case_types() {
2406 let schema = DFSchema::from_unqualified_fields(
2408 vec![
2409 Field::new("null_field", DataType::Null, true),
2410 Field::new("binary_field", DataType::Binary, false),
2411 Field::new("large_binary", DataType::LargeBinary, true),
2412 Field::new("large_utf8", DataType::LargeUtf8, false),
2413 Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2414 Field::new(
2415 "fixed_size_list",
2416 DataType::FixedSizeList(
2417 Arc::new(Field::new("item", DataType::Int32, true)),
2418 5,
2419 ),
2420 false,
2421 ),
2422 Field::new("decimal32", DataType::Decimal32(9, 4), true),
2423 Field::new("decimal64", DataType::Decimal64(9, 4), true),
2424 Field::new("decimal128", DataType::Decimal128(18, 4), true),
2425 Field::new("decimal256", DataType::Decimal256(38, 10), false),
2426 Field::new("date32", DataType::Date32, true),
2427 Field::new("date64", DataType::Date64, false),
2428 Field::new(
2429 "time32_seconds",
2430 DataType::Time32(arrow::datatypes::TimeUnit::Second),
2431 true,
2432 ),
2433 Field::new(
2434 "time64_nanoseconds",
2435 DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2436 false,
2437 ),
2438 ]
2439 .into(),
2440 HashMap::new(),
2441 )
2442 .unwrap();
2443
2444 let output = schema.tree_string();
2445
2446 insta::assert_snapshot!(output, @r"
2447 root
2448 |-- null_field: null (nullable = true)
2449 |-- binary_field: binary (nullable = false)
2450 |-- large_binary: large_binary (nullable = true)
2451 |-- large_utf8: large_utf8 (nullable = false)
2452 |-- fixed_size_binary: fixed_size_binary (nullable = true)
2453 |-- fixed_size_list: fixed size list (nullable = false)
2454 | |-- item: int32 (nullable = true)
2455 |-- decimal32: decimal32(9, 4) (nullable = true)
2456 |-- decimal64: decimal64(9, 4) (nullable = true)
2457 |-- decimal128: decimal128(18, 4) (nullable = true)
2458 |-- decimal256: decimal256(38, 10) (nullable = false)
2459 |-- date32: date32 (nullable = true)
2460 |-- date64: date64 (nullable = false)
2461 |-- time32_seconds: time32 (nullable = true)
2462 |-- time64_nanoseconds: time64 (nullable = false)
2463 ");
2464 }
2465}