1use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{_plan_err, _schema_err, DataFusionError, Result};
27use crate::{
28 Column, FunctionalDependencies, SchemaError, TableReference, field_not_found,
29 unqualified_field_not_found,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34 DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37pub type DFSchemaRef = Arc<DFSchema>;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct DFSchema {
113 inner: SchemaRef,
115 field_qualifiers: Vec<Option<TableReference>>,
118 functional_dependencies: FunctionalDependencies,
120}
121
122impl DFSchema {
123 pub fn empty() -> Self {
125 Self {
126 inner: Arc::new(Schema::new([])),
127 field_qualifiers: vec![],
128 functional_dependencies: FunctionalDependencies::empty(),
129 }
130 }
131
132 pub fn as_arrow(&self) -> &Schema {
136 self.inner.as_ref()
137 }
138
139 pub fn inner(&self) -> &SchemaRef {
143 &self.inner
144 }
145
146 pub fn new_with_metadata(
148 qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
149 metadata: HashMap<String, String>,
150 ) -> Result<Self> {
151 let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
152 qualified_fields.into_iter().unzip();
153
154 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
155
156 let dfschema = Self {
157 inner: schema,
158 field_qualifiers: qualifiers,
159 functional_dependencies: FunctionalDependencies::empty(),
160 };
161 dfschema.check_names()?;
162 Ok(dfschema)
163 }
164
165 pub fn from_unqualified_fields(
167 fields: Fields,
168 metadata: HashMap<String, String>,
169 ) -> Result<Self> {
170 let field_count = fields.len();
171 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
172 let dfschema = Self {
173 inner: schema,
174 field_qualifiers: vec![None; field_count],
175 functional_dependencies: FunctionalDependencies::empty(),
176 };
177 dfschema.check_names()?;
178 Ok(dfschema)
179 }
180
181 pub fn try_from_qualified_schema(
186 qualifier: impl Into<TableReference>,
187 schema: &Schema,
188 ) -> Result<Self> {
189 let qualifier = qualifier.into();
190 let schema = DFSchema {
191 inner: schema.clone().into(),
192 field_qualifiers: vec![Some(qualifier); schema.fields.len()],
193 functional_dependencies: FunctionalDependencies::empty(),
194 };
195 schema.check_names()?;
196 Ok(schema)
197 }
198
199 pub fn from_field_specific_qualified_schema(
201 qualifiers: Vec<Option<TableReference>>,
202 schema: &SchemaRef,
203 ) -> Result<Self> {
204 let dfschema = Self {
205 inner: Arc::clone(schema),
206 field_qualifiers: qualifiers,
207 functional_dependencies: FunctionalDependencies::empty(),
208 };
209 dfschema.check_names()?;
210 Ok(dfschema)
211 }
212
213 pub fn with_field_specific_qualified_schema(
215 &self,
216 qualifiers: Vec<Option<TableReference>>,
217 ) -> Result<Self> {
218 if qualifiers.len() != self.fields().len() {
219 return _plan_err!(
220 "Number of qualifiers must match number of fields. Expected {}, got {}",
221 self.fields().len(),
222 qualifiers.len()
223 );
224 }
225 Ok(DFSchema {
226 inner: Arc::clone(&self.inner),
227 field_qualifiers: qualifiers,
228 functional_dependencies: self.functional_dependencies.clone(),
229 })
230 }
231
232 pub fn check_names(&self) -> Result<()> {
234 let mut qualified_names = BTreeSet::new();
235 let mut unqualified_names = BTreeSet::new();
236
237 for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
238 if let Some(qualifier) = qualifier {
239 if !qualified_names.insert((qualifier, field.name())) {
240 return _schema_err!(SchemaError::DuplicateQualifiedField {
241 qualifier: Box::new(qualifier.clone()),
242 name: field.name().to_string(),
243 });
244 }
245 } else if !unqualified_names.insert(field.name()) {
246 return _schema_err!(SchemaError::DuplicateUnqualifiedField {
247 name: field.name().to_string()
248 });
249 }
250 }
251
252 for (qualifier, name) in qualified_names {
253 if unqualified_names.contains(name) {
254 return _schema_err!(SchemaError::AmbiguousReference {
255 field: Box::new(Column::new(Some(qualifier.clone()), name))
256 });
257 }
258 }
259 Ok(())
260 }
261
262 pub fn with_functional_dependencies(
264 mut self,
265 functional_dependencies: FunctionalDependencies,
266 ) -> Result<Self> {
267 if functional_dependencies.is_valid(self.inner.fields.len()) {
268 self.functional_dependencies = functional_dependencies;
269 Ok(self)
270 } else {
271 _plan_err!(
272 "Invalid functional dependency: {:?}",
273 functional_dependencies
274 )
275 }
276 }
277
278 pub fn join(&self, schema: &DFSchema) -> Result<Self> {
281 let mut schema_builder = SchemaBuilder::new();
282 schema_builder.extend(self.inner.fields().iter().cloned());
283 schema_builder.extend(schema.fields().iter().cloned());
284 let new_schema = schema_builder.finish();
285
286 let mut new_metadata = self.inner.metadata.clone();
287 new_metadata.extend(schema.inner.metadata.clone());
288 let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
289
290 let mut new_qualifiers = self.field_qualifiers.clone();
291 new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
292
293 let new_self = Self {
294 inner: Arc::new(new_schema_with_metadata),
295 field_qualifiers: new_qualifiers,
296 functional_dependencies: FunctionalDependencies::empty(),
297 };
298 new_self.check_names()?;
299 Ok(new_self)
300 }
301
302 pub fn merge(&mut self, other_schema: &DFSchema) {
319 if other_schema.inner.fields.is_empty() {
320 return;
321 }
322
323 let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
324 self.iter().collect();
325 let self_unqualified_names: HashSet<&str> = self
326 .inner
327 .fields
328 .iter()
329 .map(|field| field.name().as_str())
330 .collect();
331
332 let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
333 let mut qualifiers = Vec::new();
334 for (qualifier, field) in other_schema.iter() {
335 let duplicated_field = match qualifier {
337 Some(q) => self_fields.contains(&(Some(q), field)),
338 None => self_unqualified_names.contains(field.name().as_str()),
340 };
341 if !duplicated_field {
342 schema_builder.push(Arc::clone(field));
343 qualifiers.push(qualifier.cloned());
344 }
345 }
346 let mut metadata = self.inner.metadata.clone();
347 metadata.extend(other_schema.inner.metadata.clone());
348
349 let finished = schema_builder.finish();
350 let finished_with_metadata = finished.with_metadata(metadata);
351 self.inner = finished_with_metadata.into();
352 self.field_qualifiers.extend(qualifiers);
353 }
354
355 pub fn fields(&self) -> &Fields {
357 &self.inner.fields
358 }
359
360 pub fn field(&self, i: usize) -> &FieldRef {
365 &self.inner.fields[i]
366 }
367
368 pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &FieldRef) {
371 (self.field_qualifiers[i].as_ref(), self.field(i))
372 }
373
374 pub fn index_of_column_by_name(
375 &self,
376 qualifier: Option<&TableReference>,
377 name: &str,
378 ) -> Option<usize> {
379 let mut matches = self
380 .iter()
381 .enumerate()
382 .filter(|(_, (q, f))| match (qualifier, q) {
383 (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
387 (Some(_), None) => false,
389 (None, Some(_)) | (None, None) => f.name() == name,
391 })
392 .map(|(idx, _)| idx);
393 matches.next()
394 }
395
396 pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
402 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
403 }
404
405 pub fn index_of_column(&self, col: &Column) -> Result<usize> {
411 self.maybe_index_of_column(col)
412 .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
413 }
414
415 pub fn is_column_from_schema(&self, col: &Column) -> bool {
417 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
418 .is_some()
419 }
420
421 pub fn field_with_name(
423 &self,
424 qualifier: Option<&TableReference>,
425 name: &str,
426 ) -> Result<&FieldRef> {
427 if let Some(qualifier) = qualifier {
428 self.field_with_qualified_name(qualifier, name)
429 } else {
430 self.field_with_unqualified_name(name)
431 }
432 }
433
434 pub fn qualified_field_with_name(
436 &self,
437 qualifier: Option<&TableReference>,
438 name: &str,
439 ) -> Result<(Option<&TableReference>, &FieldRef)> {
440 if let Some(qualifier) = qualifier {
441 let idx = self
442 .index_of_column_by_name(Some(qualifier), name)
443 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
444 Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
445 } else {
446 self.qualified_field_with_unqualified_name(name)
447 }
448 }
449
450 pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&FieldRef> {
452 self.iter()
453 .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
454 .map(|(_, f)| f)
455 .collect()
456 }
457
458 pub fn fields_indices_with_qualified(
460 &self,
461 qualifier: &TableReference,
462 ) -> Vec<usize> {
463 self.iter()
464 .enumerate()
465 .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
466 .collect()
467 }
468
469 pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&FieldRef> {
471 self.fields()
472 .iter()
473 .filter(|field| field.name() == name)
474 .collect()
475 }
476
477 pub fn qualified_fields_with_unqualified_name(
479 &self,
480 name: &str,
481 ) -> Vec<(Option<&TableReference>, &FieldRef)> {
482 self.iter()
483 .filter(|(_, field)| field.name() == name)
484 .collect()
485 }
486
487 pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
489 self.iter()
490 .filter(|(_, field)| field.name() == name)
491 .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
492 .collect()
493 }
494
495 pub fn columns(&self) -> Vec<Column> {
497 self.iter()
498 .map(|(qualifier, field)| {
499 Column::new(qualifier.cloned(), field.name().clone())
500 })
501 .collect()
502 }
503
504 pub fn qualified_field_with_unqualified_name(
506 &self,
507 name: &str,
508 ) -> Result<(Option<&TableReference>, &FieldRef)> {
509 let matches = self.qualified_fields_with_unqualified_name(name);
510 match matches.len() {
511 0 => Err(unqualified_field_not_found(name, self)),
512 1 => Ok((matches[0].0, matches[0].1)),
513 _ => {
514 let fields_without_qualifier = matches
522 .iter()
523 .filter(|(q, _)| q.is_none())
524 .collect::<Vec<_>>();
525 if fields_without_qualifier.len() == 1 {
526 Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
527 } else {
528 _schema_err!(SchemaError::AmbiguousReference {
529 field: Box::new(Column::new_unqualified(name.to_string()))
530 })
531 }
532 }
533 }
534 }
535
536 pub fn field_with_unqualified_name(&self, name: &str) -> Result<&FieldRef> {
538 self.qualified_field_with_unqualified_name(name)
539 .map(|(_, field)| field)
540 }
541
542 pub fn field_with_qualified_name(
544 &self,
545 qualifier: &TableReference,
546 name: &str,
547 ) -> Result<&FieldRef> {
548 let idx = self
549 .index_of_column_by_name(Some(qualifier), name)
550 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
551
552 Ok(self.field(idx))
553 }
554
555 pub fn qualified_field_from_column(
557 &self,
558 column: &Column,
559 ) -> Result<(Option<&TableReference>, &FieldRef)> {
560 self.qualified_field_with_name(column.relation.as_ref(), &column.name)
561 }
562
563 pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
565 self.fields().iter().any(|field| field.name() == name)
566 }
567
568 pub fn has_column_with_qualified_name(
570 &self,
571 qualifier: &TableReference,
572 name: &str,
573 ) -> bool {
574 self.iter()
575 .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
576 }
577
578 pub fn has_column(&self, column: &Column) -> bool {
580 match &column.relation {
581 Some(r) => self.has_column_with_qualified_name(r, &column.name),
582 None => self.has_column_with_unqualified_name(&column.name),
583 }
584 }
585
586 pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
588 self.inner
589 .fields
590 .iter()
591 .zip(arrow_schema.fields().iter())
592 .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
593 }
594
595 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
597 pub fn check_arrow_schema_type_compatible(
598 &self,
599 arrow_schema: &Schema,
600 ) -> Result<()> {
601 let self_arrow_schema = self.as_arrow();
602 self_arrow_schema
603 .fields()
604 .iter()
605 .zip(arrow_schema.fields().iter())
606 .try_for_each(|(l_field, r_field)| {
607 if !can_cast_types(r_field.data_type(), l_field.data_type()) {
608 _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
609 r_field.name(),
610 r_field.data_type(),
611 l_field.name(),
612 l_field.data_type())
613 } else {
614 Ok(())
615 }
616 })
617 }
618
619 pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
625 if self.fields().len() != other.fields().len() {
626 return false;
627 }
628 let self_fields = self.iter();
629 let other_fields = other.iter();
630 self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
631 q1 == q2
632 && f1.name() == f2.name()
633 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
634 })
635 }
636
637 #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
638 pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
639 self.has_equivalent_names_and_types(other).is_ok()
640 }
641
642 pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
654 if self.fields().len() != other.fields().len() {
656 _plan_err!(
657 "Schema mismatch: the schema length are not same \
658 Expected schema length: {}, got: {}",
659 self.fields().len(),
660 other.fields().len()
661 )
662 } else {
663 self.fields()
666 .iter()
667 .zip(other.fields().iter())
668 .try_for_each(|(f1, f2)| {
669 if f1.name() != f2.name()
670 || (!DFSchema::datatype_is_semantically_equal(
671 f1.data_type(),
672 f2.data_type(),
673 ))
674 {
675 _plan_err!(
676 "Schema mismatch: Expected field '{}' with type {}, \
677 but got '{}' with type {}.",
678 f1.name(),
679 f1.data_type(),
680 f2.name(),
681 f2.data_type()
682 )
683 } else {
684 Ok(())
685 }
686 })
687 }
688 }
689
690 pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
698 match (dt1, dt2) {
700 (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
701 Self::datatype_is_logically_equal(v1.as_ref(), v2.as_ref())
702 }
703 (DataType::Dictionary(_, v1), othertype)
704 | (othertype, DataType::Dictionary(_, v1)) => {
705 Self::datatype_is_logically_equal(v1.as_ref(), othertype)
706 }
707 (DataType::List(f1), DataType::List(f2))
708 | (DataType::LargeList(f1), DataType::LargeList(f2))
709 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
710 Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
713 }
714 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
715 match (f1.data_type(), f2.data_type()) {
718 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
719 f1_inner.len() == f2_inner.len()
720 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
721 Self::datatype_is_logically_equal(
722 f1.data_type(),
723 f2.data_type(),
724 )
725 })
726 }
727 _ => panic!("Map type should have an inner struct field"),
728 }
729 }
730 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
731 let iter1 = fields1.iter();
732 let iter2 = fields2.iter();
733 fields1.len() == fields2.len() &&
734 iter1
736 .zip(iter2)
737 .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
738 }
739 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
740 let iter1 = fields1.iter();
741 let iter2 = fields2.iter();
742 fields1.len() == fields2.len() &&
743 iter1
745 .zip(iter2)
746 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
747 }
748 (DataType::Utf8, DataType::Utf8View) => true,
750 (DataType::Utf8View, DataType::Utf8) => true,
751 _ => Self::datatype_is_semantically_equal(dt1, dt2),
752 }
753 }
754
755 pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
761 match (dt1, dt2) {
763 (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
764 Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
765 && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
766 }
767 (DataType::List(f1), DataType::List(f2))
768 | (DataType::LargeList(f1), DataType::LargeList(f2))
769 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
770 Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
773 }
774 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
775 match (f1.data_type(), f2.data_type()) {
778 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
779 f1_inner.len() == f2_inner.len()
780 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
781 Self::datatype_is_semantically_equal(
782 f1.data_type(),
783 f2.data_type(),
784 )
785 })
786 }
787 _ => panic!("Map type should have an inner struct field"),
788 }
789 }
790 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
791 let iter1 = fields1.iter();
792 let iter2 = fields2.iter();
793 fields1.len() == fields2.len() &&
794 iter1
796 .zip(iter2)
797 .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
798 }
799 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
800 let iter1 = fields1.iter();
801 let iter2 = fields2.iter();
802 fields1.len() == fields2.len() &&
803 iter1
805 .zip(iter2)
806 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
807 }
808 (
809 DataType::Decimal32(_l_precision, _l_scale),
810 DataType::Decimal32(_r_precision, _r_scale),
811 ) => true,
812 (
813 DataType::Decimal64(_l_precision, _l_scale),
814 DataType::Decimal64(_r_precision, _r_scale),
815 ) => true,
816 (
817 DataType::Decimal128(_l_precision, _l_scale),
818 DataType::Decimal128(_r_precision, _r_scale),
819 ) => true,
820 (
821 DataType::Decimal256(_l_precision, _l_scale),
822 DataType::Decimal256(_r_precision, _r_scale),
823 ) => true,
824 (
825 DataType::Timestamp(_l_time_unit, _l_timezone),
826 DataType::Timestamp(_r_time_unit, _r_timezone),
827 ) => true,
828 _ => dt1 == dt2,
829 }
830 }
831
832 fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
833 f1.name() == f2.name()
834 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
835 }
836
837 fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
838 f1.name() == f2.name()
839 && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
840 }
841
842 pub fn strip_qualifiers(self) -> Self {
844 DFSchema {
845 field_qualifiers: vec![None; self.inner.fields.len()],
846 inner: self.inner,
847 functional_dependencies: self.functional_dependencies,
848 }
849 }
850
851 pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
853 let qualifier = qualifier.into();
854 DFSchema {
855 field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
856 inner: self.inner,
857 functional_dependencies: self.functional_dependencies,
858 }
859 }
860
861 pub fn field_names(&self) -> Vec<String> {
863 self.iter()
864 .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
865 .collect::<Vec<_>>()
866 }
867
868 pub fn metadata(&self) -> &HashMap<String, String> {
870 &self.inner.metadata
871 }
872
873 pub fn functional_dependencies(&self) -> &FunctionalDependencies {
875 &self.functional_dependencies
876 }
877
878 pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
880 self.field_qualifiers
881 .iter()
882 .zip(self.inner.fields().iter())
883 .map(|(qualifier, field)| (qualifier.as_ref(), field))
884 }
885 pub fn tree_string(&self) -> impl Display + '_ {
915 let mut result = String::from("root\n");
916
917 for (qualifier, field) in self.iter() {
918 let field_name = match qualifier {
919 Some(q) => format!("{}.{}", q, field.name()),
920 None => field.name().to_string(),
921 };
922
923 format_field_with_indent(
924 &mut result,
925 &field_name,
926 field.data_type(),
927 field.is_nullable(),
928 " ",
929 );
930 }
931
932 if result.ends_with('\n') {
934 result.pop();
935 }
936
937 result
938 }
939}
940
941fn format_field_with_indent(
943 result: &mut String,
944 field_name: &str,
945 data_type: &DataType,
946 nullable: bool,
947 indent: &str,
948) {
949 let nullable_str = nullable.to_string().to_lowercase();
950 let child_indent = format!("{indent}| ");
951
952 match data_type {
953 DataType::List(field) => {
954 result.push_str(&format!(
955 "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
956 ));
957 format_field_with_indent(
958 result,
959 field.name(),
960 field.data_type(),
961 field.is_nullable(),
962 &child_indent,
963 );
964 }
965 DataType::LargeList(field) => {
966 result.push_str(&format!(
967 "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
968 ));
969 format_field_with_indent(
970 result,
971 field.name(),
972 field.data_type(),
973 field.is_nullable(),
974 &child_indent,
975 );
976 }
977 DataType::FixedSizeList(field, _size) => {
978 result.push_str(&format!(
979 "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
980 ));
981 format_field_with_indent(
982 result,
983 field.name(),
984 field.data_type(),
985 field.is_nullable(),
986 &child_indent,
987 );
988 }
989 DataType::Map(field, _) => {
990 result.push_str(&format!(
991 "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
992 ));
993 if let DataType::Struct(inner_fields) = field.data_type()
994 && inner_fields.len() == 2
995 {
996 format_field_with_indent(
997 result,
998 "key",
999 inner_fields[0].data_type(),
1000 inner_fields[0].is_nullable(),
1001 &child_indent,
1002 );
1003 let value_contains_null = field.is_nullable().to_string().to_lowercase();
1004 match inner_fields[1].data_type() {
1006 DataType::Struct(_)
1007 | DataType::List(_)
1008 | DataType::LargeList(_)
1009 | DataType::FixedSizeList(_, _)
1010 | DataType::Map(_, _) => {
1011 format_field_with_indent(
1012 result,
1013 "value",
1014 inner_fields[1].data_type(),
1015 inner_fields[1].is_nullable(),
1016 &child_indent,
1017 );
1018 }
1019 _ => {
1020 result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1021 format_simple_data_type(inner_fields[1].data_type())));
1022 }
1023 }
1024 }
1025 }
1026 DataType::Struct(fields) => {
1027 result.push_str(&format!(
1028 "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1029 ));
1030 for struct_field in fields {
1031 format_field_with_indent(
1032 result,
1033 struct_field.name(),
1034 struct_field.data_type(),
1035 struct_field.is_nullable(),
1036 &child_indent,
1037 );
1038 }
1039 }
1040 _ => {
1041 let type_str = format_simple_data_type(data_type);
1042 result.push_str(&format!(
1043 "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1044 ));
1045 }
1046 }
1047}
1048
1049fn format_simple_data_type(data_type: &DataType) -> String {
1051 match data_type {
1052 DataType::Boolean => "boolean".to_string(),
1053 DataType::Int8 => "int8".to_string(),
1054 DataType::Int16 => "int16".to_string(),
1055 DataType::Int32 => "int32".to_string(),
1056 DataType::Int64 => "int64".to_string(),
1057 DataType::UInt8 => "uint8".to_string(),
1058 DataType::UInt16 => "uint16".to_string(),
1059 DataType::UInt32 => "uint32".to_string(),
1060 DataType::UInt64 => "uint64".to_string(),
1061 DataType::Float16 => "float16".to_string(),
1062 DataType::Float32 => "float32".to_string(),
1063 DataType::Float64 => "float64".to_string(),
1064 DataType::Utf8 => "utf8".to_string(),
1065 DataType::LargeUtf8 => "large_utf8".to_string(),
1066 DataType::Binary => "binary".to_string(),
1067 DataType::LargeBinary => "large_binary".to_string(),
1068 DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1069 DataType::Date32 => "date32".to_string(),
1070 DataType::Date64 => "date64".to_string(),
1071 DataType::Time32(_) => "time32".to_string(),
1072 DataType::Time64(_) => "time64".to_string(),
1073 DataType::Timestamp(_, tz) => match tz {
1074 Some(tz_str) => format!("timestamp ({tz_str})"),
1075 None => "timestamp".to_string(),
1076 },
1077 DataType::Interval(_) => "interval".to_string(),
1078 DataType::Dictionary(_, value_type) => {
1079 format_simple_data_type(value_type.as_ref())
1080 }
1081 DataType::Decimal32(precision, scale) => {
1082 format!("decimal32({precision}, {scale})")
1083 }
1084 DataType::Decimal64(precision, scale) => {
1085 format!("decimal64({precision}, {scale})")
1086 }
1087 DataType::Decimal128(precision, scale) => {
1088 format!("decimal128({precision}, {scale})")
1089 }
1090 DataType::Decimal256(precision, scale) => {
1091 format!("decimal256({precision}, {scale})")
1092 }
1093 DataType::Null => "null".to_string(),
1094 _ => format!("{data_type}").to_lowercase(),
1095 }
1096}
1097
1098impl AsRef<Schema> for DFSchema {
1100 fn as_ref(&self) -> &Schema {
1101 self.as_arrow()
1102 }
1103}
1104
1105impl AsRef<SchemaRef> for DFSchema {
1108 fn as_ref(&self) -> &SchemaRef {
1109 self.inner()
1110 }
1111}
1112
1113impl TryFrom<Schema> for DFSchema {
1115 type Error = DataFusionError;
1116 fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1117 Self::try_from(Arc::new(schema))
1118 }
1119}
1120
1121impl TryFrom<SchemaRef> for DFSchema {
1122 type Error = DataFusionError;
1123 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1124 let field_count = schema.fields.len();
1125 let dfschema = Self {
1126 inner: schema,
1127 field_qualifiers: vec![None; field_count],
1128 functional_dependencies: FunctionalDependencies::empty(),
1129 };
1130 Ok(dfschema)
1136 }
1137}
1138
1139impl From<DFSchema> for SchemaRef {
1140 fn from(dfschema: DFSchema) -> Self {
1141 Arc::clone(&dfschema.inner)
1142 }
1143}
1144
1145impl Hash for DFSchema {
1147 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1148 self.inner.fields.hash(state);
1149 self.inner.metadata.len().hash(state); }
1151}
1152
1153pub trait ToDFSchema
1155where
1156 Self: Sized,
1157{
1158 fn to_dfschema(self) -> Result<DFSchema>;
1160
1161 fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1163 Ok(Arc::new(self.to_dfschema()?))
1164 }
1165}
1166
1167impl ToDFSchema for Schema {
1168 fn to_dfschema(self) -> Result<DFSchema> {
1169 DFSchema::try_from(self)
1170 }
1171}
1172
1173impl ToDFSchema for SchemaRef {
1174 fn to_dfschema(self) -> Result<DFSchema> {
1175 DFSchema::try_from(self)
1176 }
1177}
1178
1179impl ToDFSchema for Vec<Field> {
1180 fn to_dfschema(self) -> Result<DFSchema> {
1181 let field_count = self.len();
1182 let schema = Schema {
1183 fields: self.into(),
1184 metadata: HashMap::new(),
1185 };
1186 let dfschema = DFSchema {
1187 inner: schema.into(),
1188 field_qualifiers: vec![None; field_count],
1189 functional_dependencies: FunctionalDependencies::empty(),
1190 };
1191 Ok(dfschema)
1192 }
1193}
1194
1195impl Display for DFSchema {
1196 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1197 write!(
1198 f,
1199 "fields:[{}], metadata:{:?}",
1200 self.iter()
1201 .map(|(q, f)| qualified_name(q, f.name()))
1202 .collect::<Vec<String>>()
1203 .join(", "),
1204 self.inner.metadata
1205 )
1206 }
1207}
1208
1209pub trait ExprSchema: std::fmt::Debug {
1215 fn nullable(&self, col: &Column) -> Result<bool> {
1217 Ok(self.field_from_column(col)?.is_nullable())
1218 }
1219
1220 fn data_type(&self, col: &Column) -> Result<&DataType> {
1222 Ok(self.field_from_column(col)?.data_type())
1223 }
1224
1225 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1227 Ok(self.field_from_column(col)?.metadata())
1228 }
1229
1230 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1232 let field = self.field_from_column(col)?;
1233 Ok((field.data_type(), field.is_nullable()))
1234 }
1235
1236 fn field_from_column(&self, col: &Column) -> Result<&FieldRef>;
1238}
1239
1240impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1242 fn nullable(&self, col: &Column) -> Result<bool> {
1243 self.as_ref().nullable(col)
1244 }
1245
1246 fn data_type(&self, col: &Column) -> Result<&DataType> {
1247 self.as_ref().data_type(col)
1248 }
1249
1250 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1251 ExprSchema::metadata(self.as_ref(), col)
1252 }
1253
1254 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1255 self.as_ref().data_type_and_nullable(col)
1256 }
1257
1258 fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1259 self.as_ref().field_from_column(col)
1260 }
1261}
1262
1263impl ExprSchema for DFSchema {
1264 fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
1265 match &col.relation {
1266 Some(r) => self.field_with_qualified_name(r, &col.name),
1267 None => self.field_with_unqualified_name(&col.name),
1268 }
1269 }
1270}
1271
1272pub trait SchemaExt {
1274 fn equivalent_names_and_types(&self, other: &Self) -> bool;
1279
1280 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1288}
1289
1290impl SchemaExt for Schema {
1291 fn equivalent_names_and_types(&self, other: &Self) -> bool {
1292 if self.fields().len() != other.fields().len() {
1293 return false;
1294 }
1295
1296 self.fields()
1297 .iter()
1298 .zip(other.fields().iter())
1299 .all(|(f1, f2)| {
1300 f1.name() == f2.name()
1301 && DFSchema::datatype_is_semantically_equal(
1302 f1.data_type(),
1303 f2.data_type(),
1304 )
1305 })
1306 }
1307
1308 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1310 if self.fields().len() != other.fields().len() {
1312 _plan_err!(
1313 "Inserting query must have the same schema length as the table. \
1314 Expected table schema length: {}, got: {}",
1315 self.fields().len(),
1316 other.fields().len()
1317 )
1318 } else {
1319 self.fields()
1322 .iter()
1323 .zip(other.fields().iter())
1324 .try_for_each(|(f1, f2)| {
1325 if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1326 _plan_err!(
1327 "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1328 but got '{}' with type {}.",
1329 f1.name(),
1330 f1.data_type(),
1331 f2.name(),
1332 f2.data_type())
1333 } else {
1334 Ok(())
1335 }
1336 })
1337 }
1338 }
1339}
1340
1341pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1342 match qualifier {
1343 Some(q) => format!("{q}.{name}"),
1344 None => name.to_string(),
1345 }
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350 use crate::assert_contains;
1351
1352 use super::*;
1353
1354 #[test]
1355 fn qualifier_in_name() -> Result<()> {
1356 let col = Column::from_name("t1.c0");
1357 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1358 let err = schema.index_of_column(&col).unwrap_err();
1360 let expected = "Schema error: No field named \"t1.c0\". \
1361 Column names are case sensitive. \
1362 You can use double quotes to refer to the \"\"t1.c0\"\" column \
1363 or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1364 Did you mean 't1.c0'?.";
1365 assert_eq!(err.strip_backtrace(), expected);
1366 Ok(())
1367 }
1368
1369 #[test]
1370 fn quoted_qualifiers_in_name() -> Result<()> {
1371 let col = Column::from_name("t1.c0");
1372 let schema = DFSchema::try_from_qualified_schema(
1373 "t1",
1374 &Schema::new(vec![
1375 Field::new("CapitalColumn", DataType::Boolean, true),
1376 Field::new("field.with.period", DataType::Boolean, true),
1377 ]),
1378 )?;
1379
1380 let err = schema.index_of_column(&col).unwrap_err();
1382 let expected = "Schema error: No field named \"t1.c0\". \
1383 Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1384 assert_eq!(err.strip_backtrace(), expected);
1385 Ok(())
1386 }
1387
1388 #[test]
1389 fn from_unqualified_schema() -> Result<()> {
1390 let schema = DFSchema::try_from(test_schema_1())?;
1391 assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1392 Ok(())
1393 }
1394
1395 #[test]
1396 fn from_qualified_schema() -> Result<()> {
1397 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1398 assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1399 Ok(())
1400 }
1401
1402 #[test]
1403 fn test_from_field_specific_qualified_schema() -> Result<()> {
1404 let schema = DFSchema::from_field_specific_qualified_schema(
1405 vec![Some("t1".into()), None],
1406 &Arc::new(Schema::new(vec![
1407 Field::new("c0", DataType::Boolean, true),
1408 Field::new("c1", DataType::Boolean, true),
1409 ])),
1410 )?;
1411 assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1412 Ok(())
1413 }
1414
1415 #[test]
1416 fn test_from_qualified_fields() -> Result<()> {
1417 let schema = DFSchema::new_with_metadata(
1418 vec![
1419 (
1420 Some("t0".into()),
1421 Arc::new(Field::new("c0", DataType::Boolean, true)),
1422 ),
1423 (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1424 ],
1425 HashMap::new(),
1426 )?;
1427 assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1428 Ok(())
1429 }
1430
1431 #[test]
1432 fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1433 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1434 let arrow_schema = schema.as_arrow();
1435 insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1436 Ok(())
1437 }
1438
1439 #[test]
1440 fn join_qualified() -> Result<()> {
1441 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1442 let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1443 let join = left.join(&right)?;
1444 assert_eq!(
1445 "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1446 join.to_string()
1447 );
1448 assert!(
1450 join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1451 .is_ok()
1452 );
1453 assert!(
1454 join.field_with_qualified_name(&TableReference::bare("t2"), "c0")
1455 .is_ok()
1456 );
1457 assert!(join.field_with_unqualified_name("c0").is_err());
1459 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1460 assert!(join.field_with_unqualified_name("t2.c0").is_err());
1461 Ok(())
1462 }
1463
1464 #[test]
1465 fn join_qualified_duplicate() -> Result<()> {
1466 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1467 let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1468 let join = left.join(&right);
1469 assert_eq!(
1470 join.unwrap_err().strip_backtrace(),
1471 "Schema error: Schema contains duplicate qualified field name t1.c0",
1472 );
1473 Ok(())
1474 }
1475
1476 #[test]
1477 fn join_unqualified_duplicate() -> Result<()> {
1478 let left = DFSchema::try_from(test_schema_1())?;
1479 let right = DFSchema::try_from(test_schema_1())?;
1480 let join = left.join(&right);
1481 assert_eq!(
1482 join.unwrap_err().strip_backtrace(),
1483 "Schema error: Schema contains duplicate unqualified field name c0"
1484 );
1485 Ok(())
1486 }
1487
1488 #[test]
1489 fn join_mixed() -> Result<()> {
1490 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1491 let right = DFSchema::try_from(test_schema_2())?;
1492 let join = left.join(&right)?;
1493 assert_eq!(
1494 "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1495 join.to_string()
1496 );
1497 assert!(
1499 join.field_with_qualified_name(&TableReference::bare("t1"), "c0")
1500 .is_ok()
1501 );
1502 assert!(join.field_with_unqualified_name("c0").is_ok());
1503 assert!(join.field_with_unqualified_name("c100").is_ok());
1504 assert!(join.field_with_name(None, "c100").is_ok());
1505 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1507 assert!(join.field_with_unqualified_name("t1.c100").is_err());
1508 assert!(
1509 join.field_with_qualified_name(&TableReference::bare(""), "c100")
1510 .is_err()
1511 );
1512 Ok(())
1513 }
1514
1515 #[test]
1516 fn join_mixed_duplicate() -> Result<()> {
1517 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1518 let right = DFSchema::try_from(test_schema_1())?;
1519 let join = left.join(&right);
1520 assert_contains!(
1521 join.unwrap_err().to_string(),
1522 "Schema error: Schema contains qualified \
1523 field name t1.c0 and unqualified field name c0 which would be ambiguous"
1524 );
1525 Ok(())
1526 }
1527
1528 #[test]
1529 fn helpful_error_messages() -> Result<()> {
1530 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1531 let expected_help = "Valid fields are t1.c0, t1.c1.";
1532 assert_contains!(
1533 schema
1534 .field_with_qualified_name(&TableReference::bare("x"), "y")
1535 .unwrap_err()
1536 .to_string(),
1537 expected_help
1538 );
1539 assert_contains!(
1540 schema
1541 .field_with_unqualified_name("y")
1542 .unwrap_err()
1543 .to_string(),
1544 expected_help
1545 );
1546 assert!(schema.index_of_column_by_name(None, "y").is_none());
1547 assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1548
1549 Ok(())
1550 }
1551
1552 #[test]
1553 fn select_without_valid_fields() {
1554 let schema = DFSchema::empty();
1555
1556 let col = Column::from_qualified_name("t1.c0");
1557 let err = schema.index_of_column(&col).unwrap_err();
1558 let expected = "Schema error: No field named t1.c0.";
1559 assert_eq!(err.strip_backtrace(), expected);
1560
1561 let col = Column::from_name("c0");
1563 let err = schema.index_of_column(&col).err().unwrap();
1564 let expected = "Schema error: No field named c0.";
1565 assert_eq!(err.strip_backtrace(), expected);
1566 }
1567
1568 #[test]
1569 fn into() {
1570 let arrow_schema = Schema::new_with_metadata(
1572 vec![Field::new("c0", DataType::Int64, true)],
1573 test_metadata(),
1574 );
1575 let arrow_schema_ref = Arc::new(arrow_schema.clone());
1576
1577 let df_schema = DFSchema {
1578 inner: Arc::clone(&arrow_schema_ref),
1579 field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1580 functional_dependencies: FunctionalDependencies::empty(),
1581 };
1582 let df_schema_ref = Arc::new(df_schema.clone());
1583
1584 {
1585 let arrow_schema = arrow_schema.clone();
1586 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1587
1588 assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1589 assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1590 }
1591
1592 {
1593 let arrow_schema = arrow_schema.clone();
1594 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1595
1596 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1597 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1598 }
1599
1600 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1602 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1603 }
1604
1605 fn test_schema_1() -> Schema {
1606 Schema::new(vec![
1607 Field::new("c0", DataType::Boolean, true),
1608 Field::new("c1", DataType::Boolean, true),
1609 ])
1610 }
1611 #[test]
1612 fn test_dfschema_to_schema_conversion() {
1613 let mut a_metadata = HashMap::new();
1614 a_metadata.insert("key".to_string(), "value".to_string());
1615 let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1616
1617 let mut b_metadata = HashMap::new();
1618 b_metadata.insert("key".to_string(), "value".to_string());
1619 let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1620
1621 let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1622
1623 let df_schema = DFSchema {
1624 inner: Arc::clone(&schema),
1625 field_qualifiers: vec![None; schema.fields.len()],
1626 functional_dependencies: FunctionalDependencies::empty(),
1627 };
1628
1629 assert_eq!(df_schema.inner.metadata(), schema.metadata())
1630 }
1631
1632 #[test]
1633 fn test_contain_column() -> Result<()> {
1634 {
1636 let col = Column::from_qualified_name("t1.c0");
1637 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1638 assert!(schema.is_column_from_schema(&col));
1639 }
1640
1641 {
1643 let col = Column::from_qualified_name("t1.c2");
1644 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1645 assert!(!schema.is_column_from_schema(&col));
1646 }
1647
1648 {
1650 let col = Column::from_name("c0");
1651 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1652 assert!(schema.is_column_from_schema(&col));
1653 }
1654
1655 {
1657 let col = Column::from_name("c2");
1658 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1659 assert!(!schema.is_column_from_schema(&col));
1660 }
1661
1662 Ok(())
1663 }
1664
1665 #[test]
1666 fn test_datatype_is_logically_equal() {
1667 assert!(DFSchema::datatype_is_logically_equal(
1668 &DataType::Int8,
1669 &DataType::Int8
1670 ));
1671
1672 assert!(!DFSchema::datatype_is_logically_equal(
1673 &DataType::Int8,
1674 &DataType::Int16
1675 ));
1676
1677 assert!(DFSchema::datatype_is_logically_equal(
1681 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1682 &DataType::List(Field::new("element", DataType::Int8, false).into())
1683 ));
1684
1685 assert!(!DFSchema::datatype_is_logically_equal(
1687 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1688 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1689 ));
1690
1691 let map_field = DataType::Map(
1693 Field::new(
1694 "entries",
1695 DataType::Struct(Fields::from(vec![
1696 Field::new("key", DataType::Int8, false),
1697 Field::new("value", DataType::Int8, true),
1698 ])),
1699 true,
1700 )
1701 .into(),
1702 true,
1703 );
1704
1705 assert!(DFSchema::datatype_is_logically_equal(
1707 &map_field,
1708 &DataType::Map(
1709 Field::new(
1710 "pairs",
1711 DataType::Struct(Fields::from(vec![
1712 Field::new("one", DataType::Int8, false),
1713 Field::new("two", DataType::Int8, false)
1714 ])),
1715 true
1716 )
1717 .into(),
1718 true
1719 )
1720 ));
1721 assert!(!DFSchema::datatype_is_logically_equal(
1723 &map_field,
1724 &DataType::Map(
1725 Field::new(
1726 "entries",
1727 DataType::Struct(Fields::from(vec![
1728 Field::new("key", DataType::Int8, false),
1729 Field::new("value", DataType::Int16, true)
1730 ])),
1731 true
1732 )
1733 .into(),
1734 true
1735 )
1736 ));
1737
1738 assert!(!DFSchema::datatype_is_logically_equal(
1740 &map_field,
1741 &DataType::Map(
1742 Field::new(
1743 "entries",
1744 DataType::Struct(Fields::from(vec![
1745 Field::new("key", DataType::Int16, false),
1746 Field::new("value", DataType::Int8, true)
1747 ])),
1748 true
1749 )
1750 .into(),
1751 true
1752 )
1753 ));
1754
1755 let struct_field = DataType::Struct(Fields::from(vec![
1758 Field::new("a", DataType::Int8, true),
1759 Field::new("b", DataType::Int8, true),
1760 ]));
1761
1762 assert!(DFSchema::datatype_is_logically_equal(
1764 &struct_field,
1765 &DataType::Struct(Fields::from(vec![
1766 Field::new("a", DataType::Int8, false),
1767 Field::new("b", DataType::Int8, true),
1768 ]))
1769 ));
1770
1771 assert!(!DFSchema::datatype_is_logically_equal(
1773 &struct_field,
1774 &DataType::Struct(Fields::from(vec![
1775 Field::new("x", DataType::Int8, true),
1776 Field::new("y", DataType::Int8, true),
1777 ]))
1778 ));
1779
1780 assert!(!DFSchema::datatype_is_logically_equal(
1782 &struct_field,
1783 &DataType::Struct(Fields::from(vec![
1784 Field::new("a", DataType::Int16, true),
1785 Field::new("b", DataType::Int8, true),
1786 ]))
1787 ));
1788
1789 assert!(!DFSchema::datatype_is_logically_equal(
1791 &struct_field,
1792 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1793 ));
1794 }
1795
1796 #[test]
1797 fn test_datatype_is_logically_equivalent_to_dictionary() {
1798 assert!(DFSchema::datatype_is_logically_equal(
1800 &DataType::Utf8,
1801 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1802 ));
1803
1804 assert!(DFSchema::datatype_is_logically_equal(
1806 &DataType::Utf8View,
1807 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1808 ));
1809
1810 assert!(DFSchema::datatype_is_logically_equal(
1811 &DataType::Dictionary(
1812 Box::new(DataType::Int32),
1813 Box::new(DataType::List(
1814 Field::new("element", DataType::Utf8, false).into()
1815 ))
1816 ),
1817 &DataType::Dictionary(
1818 Box::new(DataType::Int32),
1819 Box::new(DataType::List(
1820 Field::new("element", DataType::Utf8View, false).into()
1821 ))
1822 )
1823 ));
1824 }
1825
1826 #[test]
1827 fn test_datatype_is_semantically_equal() {
1828 assert!(DFSchema::datatype_is_semantically_equal(
1829 &DataType::Int8,
1830 &DataType::Int8
1831 ));
1832
1833 assert!(!DFSchema::datatype_is_semantically_equal(
1834 &DataType::Int8,
1835 &DataType::Int16
1836 ));
1837
1838 assert!(DFSchema::datatype_is_semantically_equal(
1840 &DataType::Decimal32(1, 2),
1841 &DataType::Decimal32(2, 1),
1842 ));
1843
1844 assert!(DFSchema::datatype_is_semantically_equal(
1845 &DataType::Decimal64(1, 2),
1846 &DataType::Decimal64(2, 1),
1847 ));
1848
1849 assert!(DFSchema::datatype_is_semantically_equal(
1850 &DataType::Decimal128(1, 2),
1851 &DataType::Decimal128(2, 1),
1852 ));
1853
1854 assert!(DFSchema::datatype_is_semantically_equal(
1855 &DataType::Decimal256(1, 2),
1856 &DataType::Decimal256(2, 1),
1857 ));
1858
1859 assert!(DFSchema::datatype_is_semantically_equal(
1861 &DataType::Timestamp(
1862 arrow::datatypes::TimeUnit::Microsecond,
1863 Some("UTC".into())
1864 ),
1865 &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1866 ));
1867
1868 assert!(DFSchema::datatype_is_semantically_equal(
1872 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1873 &DataType::List(Field::new("element", DataType::Int8, false).into())
1874 ));
1875
1876 assert!(!DFSchema::datatype_is_semantically_equal(
1878 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1879 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1880 ));
1881
1882 let map_field = DataType::Map(
1884 Field::new(
1885 "entries",
1886 DataType::Struct(Fields::from(vec![
1887 Field::new("key", DataType::Int8, false),
1888 Field::new("value", DataType::Int8, true),
1889 ])),
1890 true,
1891 )
1892 .into(),
1893 true,
1894 );
1895
1896 assert!(DFSchema::datatype_is_semantically_equal(
1898 &map_field,
1899 &DataType::Map(
1900 Field::new(
1901 "pairs",
1902 DataType::Struct(Fields::from(vec![
1903 Field::new("one", DataType::Int8, false),
1904 Field::new("two", DataType::Int8, false)
1905 ])),
1906 true
1907 )
1908 .into(),
1909 true
1910 )
1911 ));
1912 assert!(!DFSchema::datatype_is_semantically_equal(
1914 &map_field,
1915 &DataType::Map(
1916 Field::new(
1917 "entries",
1918 DataType::Struct(Fields::from(vec![
1919 Field::new("key", DataType::Int8, false),
1920 Field::new("value", DataType::Int16, true)
1921 ])),
1922 true
1923 )
1924 .into(),
1925 true
1926 )
1927 ));
1928
1929 assert!(!DFSchema::datatype_is_semantically_equal(
1931 &map_field,
1932 &DataType::Map(
1933 Field::new(
1934 "entries",
1935 DataType::Struct(Fields::from(vec![
1936 Field::new("key", DataType::Int16, false),
1937 Field::new("value", DataType::Int8, true)
1938 ])),
1939 true
1940 )
1941 .into(),
1942 true
1943 )
1944 ));
1945
1946 let struct_field = DataType::Struct(Fields::from(vec![
1949 Field::new("a", DataType::Int8, true),
1950 Field::new("b", DataType::Int8, true),
1951 ]));
1952
1953 assert!(DFSchema::datatype_is_logically_equal(
1955 &struct_field,
1956 &DataType::Struct(Fields::from(vec![
1957 Field::new("a", DataType::Int8, false),
1958 Field::new("b", DataType::Int8, true),
1959 ]))
1960 ));
1961
1962 assert!(!DFSchema::datatype_is_logically_equal(
1964 &struct_field,
1965 &DataType::Struct(Fields::from(vec![
1966 Field::new("x", DataType::Int8, true),
1967 Field::new("y", DataType::Int8, true),
1968 ]))
1969 ));
1970
1971 assert!(!DFSchema::datatype_is_logically_equal(
1973 &struct_field,
1974 &DataType::Struct(Fields::from(vec![
1975 Field::new("a", DataType::Int16, true),
1976 Field::new("b", DataType::Int8, true),
1977 ]))
1978 ));
1979
1980 assert!(!DFSchema::datatype_is_logically_equal(
1982 &struct_field,
1983 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1984 ));
1985 }
1986
1987 #[test]
1988 fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1989 assert!(!DFSchema::datatype_is_semantically_equal(
1991 &DataType::Utf8,
1992 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1993 ));
1994 }
1995
1996 fn test_schema_2() -> Schema {
1997 Schema::new(vec![
1998 Field::new("c100", DataType::Boolean, true),
1999 Field::new("c101", DataType::Boolean, true),
2000 ])
2001 }
2002
2003 fn test_metadata() -> HashMap<String, String> {
2004 test_metadata_n(2)
2005 }
2006
2007 fn test_metadata_n(n: usize) -> HashMap<String, String> {
2008 (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
2009 }
2010
2011 #[test]
2012 fn test_print_schema_unqualified() {
2013 let schema = DFSchema::from_unqualified_fields(
2014 vec![
2015 Field::new("id", DataType::Int32, false),
2016 Field::new("name", DataType::Utf8, true),
2017 Field::new("age", DataType::Int64, true),
2018 Field::new("active", DataType::Boolean, false),
2019 ]
2020 .into(),
2021 HashMap::new(),
2022 )
2023 .unwrap();
2024
2025 let output = schema.tree_string();
2026
2027 insta::assert_snapshot!(output, @r"
2028 root
2029 |-- id: int32 (nullable = false)
2030 |-- name: utf8 (nullable = true)
2031 |-- age: int64 (nullable = true)
2032 |-- active: boolean (nullable = false)
2033 ");
2034 }
2035
2036 #[test]
2037 fn test_print_schema_qualified() {
2038 let schema = DFSchema::try_from_qualified_schema(
2039 "table1",
2040 &Schema::new(vec![
2041 Field::new("id", DataType::Int32, false),
2042 Field::new("name", DataType::Utf8, true),
2043 ]),
2044 )
2045 .unwrap();
2046
2047 let output = schema.tree_string();
2048
2049 insta::assert_snapshot!(output, @r"
2050 root
2051 |-- table1.id: int32 (nullable = false)
2052 |-- table1.name: utf8 (nullable = true)
2053 ");
2054 }
2055
2056 #[test]
2057 fn test_print_schema_complex_types() {
2058 let struct_field = Field::new(
2059 "address",
2060 DataType::Struct(Fields::from(vec![
2061 Field::new("street", DataType::Utf8, true),
2062 Field::new("city", DataType::Utf8, true),
2063 ])),
2064 true,
2065 );
2066
2067 let list_field = Field::new(
2068 "tags",
2069 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2070 true,
2071 );
2072
2073 let schema = DFSchema::from_unqualified_fields(
2074 vec![
2075 Field::new("id", DataType::Int32, false),
2076 struct_field,
2077 list_field,
2078 Field::new("score", DataType::Decimal128(10, 2), true),
2079 ]
2080 .into(),
2081 HashMap::new(),
2082 )
2083 .unwrap();
2084
2085 let output = schema.tree_string();
2086 insta::assert_snapshot!(output, @r"
2087 root
2088 |-- id: int32 (nullable = false)
2089 |-- address: struct (nullable = true)
2090 | |-- street: utf8 (nullable = true)
2091 | |-- city: utf8 (nullable = true)
2092 |-- tags: list (nullable = true)
2093 | |-- item: utf8 (nullable = true)
2094 |-- score: decimal128(10, 2) (nullable = true)
2095 ");
2096 }
2097
2098 #[test]
2099 fn test_print_schema_empty() {
2100 let schema = DFSchema::empty();
2101 let output = schema.tree_string();
2102 insta::assert_snapshot!(output, @"root");
2103 }
2104
2105 #[test]
2106 fn test_print_schema_deeply_nested_types() {
2107 let inner_struct = Field::new(
2109 "inner",
2110 DataType::Struct(Fields::from(vec![
2111 Field::new("level1", DataType::Utf8, true),
2112 Field::new("level2", DataType::Int32, false),
2113 ])),
2114 true,
2115 );
2116
2117 let nested_list = Field::new(
2118 "nested_list",
2119 DataType::List(Arc::new(Field::new(
2120 "item",
2121 DataType::Struct(Fields::from(vec![
2122 Field::new("id", DataType::Int64, false),
2123 Field::new("value", DataType::Float64, true),
2124 ])),
2125 true,
2126 ))),
2127 true,
2128 );
2129
2130 let map_field = Field::new(
2131 "map_data",
2132 DataType::Map(
2133 Arc::new(Field::new(
2134 "entries",
2135 DataType::Struct(Fields::from(vec![
2136 Field::new("key", DataType::Utf8, false),
2137 Field::new(
2138 "value",
2139 DataType::List(Arc::new(Field::new(
2140 "item",
2141 DataType::Int32,
2142 true,
2143 ))),
2144 true,
2145 ),
2146 ])),
2147 false,
2148 )),
2149 false,
2150 ),
2151 true,
2152 );
2153
2154 let schema = DFSchema::from_unqualified_fields(
2155 vec![
2156 Field::new("simple_field", DataType::Utf8, true),
2157 inner_struct,
2158 nested_list,
2159 map_field,
2160 Field::new(
2161 "timestamp_field",
2162 DataType::Timestamp(
2163 arrow::datatypes::TimeUnit::Microsecond,
2164 Some("UTC".into()),
2165 ),
2166 false,
2167 ),
2168 ]
2169 .into(),
2170 HashMap::new(),
2171 )
2172 .unwrap();
2173
2174 let output = schema.tree_string();
2175
2176 insta::assert_snapshot!(output, @r"
2177 root
2178 |-- simple_field: utf8 (nullable = true)
2179 |-- inner: struct (nullable = true)
2180 | |-- level1: utf8 (nullable = true)
2181 | |-- level2: int32 (nullable = false)
2182 |-- nested_list: list (nullable = true)
2183 | |-- item: struct (nullable = true)
2184 | | |-- id: int64 (nullable = false)
2185 | | |-- value: float64 (nullable = true)
2186 |-- map_data: map (nullable = true)
2187 | |-- key: utf8 (nullable = false)
2188 | |-- value: list (nullable = true)
2189 | | |-- item: int32 (nullable = true)
2190 |-- timestamp_field: timestamp (UTC) (nullable = false)
2191 ");
2192 }
2193
2194 #[test]
2195 fn test_print_schema_mixed_qualified_unqualified() {
2196 let schema = DFSchema::new_with_metadata(
2198 vec![
2199 (
2200 Some("table1".into()),
2201 Arc::new(Field::new("id", DataType::Int32, false)),
2202 ),
2203 (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2204 (
2205 Some("table2".into()),
2206 Arc::new(Field::new("score", DataType::Float64, true)),
2207 ),
2208 (
2209 None,
2210 Arc::new(Field::new("active", DataType::Boolean, false)),
2211 ),
2212 ],
2213 HashMap::new(),
2214 )
2215 .unwrap();
2216
2217 let output = schema.tree_string();
2218
2219 insta::assert_snapshot!(output, @r"
2220 root
2221 |-- table1.id: int32 (nullable = false)
2222 |-- name: utf8 (nullable = true)
2223 |-- table2.score: float64 (nullable = true)
2224 |-- active: boolean (nullable = false)
2225 ");
2226 }
2227
2228 #[test]
2229 fn test_print_schema_array_of_map() {
2230 let map_field = Field::new(
2232 "entries",
2233 DataType::Struct(Fields::from(vec![
2234 Field::new("key", DataType::Utf8, false),
2235 Field::new("value", DataType::Utf8, false),
2236 ])),
2237 false,
2238 );
2239
2240 let array_of_map_field = Field::new(
2241 "array_map_field",
2242 DataType::List(Arc::new(Field::new(
2243 "item",
2244 DataType::Map(Arc::new(map_field), false),
2245 false,
2246 ))),
2247 false,
2248 );
2249
2250 let schema = DFSchema::from_unqualified_fields(
2251 vec![array_of_map_field].into(),
2252 HashMap::new(),
2253 )
2254 .unwrap();
2255
2256 let output = schema.tree_string();
2257
2258 insta::assert_snapshot!(output, @r"
2259 root
2260 |-- array_map_field: list (nullable = false)
2261 | |-- item: map (nullable = false)
2262 | | |-- key: utf8 (nullable = false)
2263 | | |-- value: utf8 (nullable = false)
2264 ");
2265 }
2266
2267 #[test]
2268 fn test_print_schema_complex_type_combinations() {
2269 let list_of_structs = Field::new(
2273 "list_of_structs",
2274 DataType::List(Arc::new(Field::new(
2275 "item",
2276 DataType::Struct(Fields::from(vec![
2277 Field::new("id", DataType::Int32, false),
2278 Field::new("name", DataType::Utf8, true),
2279 Field::new("score", DataType::Float64, true),
2280 ])),
2281 true,
2282 ))),
2283 true,
2284 );
2285
2286 let struct_with_lists = Field::new(
2288 "struct_with_lists",
2289 DataType::Struct(Fields::from(vec![
2290 Field::new(
2291 "tags",
2292 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2293 true,
2294 ),
2295 Field::new(
2296 "scores",
2297 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2298 false,
2299 ),
2300 Field::new("metadata", DataType::Utf8, true),
2301 ])),
2302 false,
2303 );
2304
2305 let map_with_struct_values = Field::new(
2307 "map_with_struct_values",
2308 DataType::Map(
2309 Arc::new(Field::new(
2310 "entries",
2311 DataType::Struct(Fields::from(vec![
2312 Field::new("key", DataType::Utf8, false),
2313 Field::new(
2314 "value",
2315 DataType::Struct(Fields::from(vec![
2316 Field::new("count", DataType::Int64, false),
2317 Field::new("active", DataType::Boolean, true),
2318 ])),
2319 true,
2320 ),
2321 ])),
2322 false,
2323 )),
2324 false,
2325 ),
2326 true,
2327 );
2328
2329 let list_of_maps = Field::new(
2331 "list_of_maps",
2332 DataType::List(Arc::new(Field::new(
2333 "item",
2334 DataType::Map(
2335 Arc::new(Field::new(
2336 "entries",
2337 DataType::Struct(Fields::from(vec![
2338 Field::new("key", DataType::Utf8, false),
2339 Field::new("value", DataType::Int32, true),
2340 ])),
2341 false,
2342 )),
2343 false,
2344 ),
2345 true,
2346 ))),
2347 true,
2348 );
2349
2350 let deeply_nested = Field::new(
2352 "deeply_nested",
2353 DataType::Struct(Fields::from(vec![
2354 Field::new("level1", DataType::Utf8, true),
2355 Field::new(
2356 "level2",
2357 DataType::List(Arc::new(Field::new(
2358 "item",
2359 DataType::Struct(Fields::from(vec![
2360 Field::new("id", DataType::Int32, false),
2361 Field::new(
2362 "properties",
2363 DataType::Map(
2364 Arc::new(Field::new(
2365 "entries",
2366 DataType::Struct(Fields::from(vec![
2367 Field::new("key", DataType::Utf8, false),
2368 Field::new("value", DataType::Float64, true),
2369 ])),
2370 false,
2371 )),
2372 false,
2373 ),
2374 true,
2375 ),
2376 ])),
2377 true,
2378 ))),
2379 false,
2380 ),
2381 ])),
2382 true,
2383 );
2384
2385 let schema = DFSchema::from_unqualified_fields(
2386 vec![
2387 list_of_structs,
2388 struct_with_lists,
2389 map_with_struct_values,
2390 list_of_maps,
2391 deeply_nested,
2392 ]
2393 .into(),
2394 HashMap::new(),
2395 )
2396 .unwrap();
2397
2398 let output = schema.tree_string();
2399
2400 insta::assert_snapshot!(output, @r"
2401 root
2402 |-- list_of_structs: list (nullable = true)
2403 | |-- item: struct (nullable = true)
2404 | | |-- id: int32 (nullable = false)
2405 | | |-- name: utf8 (nullable = true)
2406 | | |-- score: float64 (nullable = true)
2407 |-- struct_with_lists: struct (nullable = false)
2408 | |-- tags: list (nullable = true)
2409 | | |-- item: utf8 (nullable = true)
2410 | |-- scores: list (nullable = false)
2411 | | |-- item: int32 (nullable = true)
2412 | |-- metadata: utf8 (nullable = true)
2413 |-- map_with_struct_values: map (nullable = true)
2414 | |-- key: utf8 (nullable = false)
2415 | |-- value: struct (nullable = true)
2416 | | |-- count: int64 (nullable = false)
2417 | | |-- active: boolean (nullable = true)
2418 |-- list_of_maps: list (nullable = true)
2419 | |-- item: map (nullable = true)
2420 | | |-- key: utf8 (nullable = false)
2421 | | |-- value: int32 (nullable = false)
2422 |-- deeply_nested: struct (nullable = true)
2423 | |-- level1: utf8 (nullable = true)
2424 | |-- level2: list (nullable = false)
2425 | | |-- item: struct (nullable = true)
2426 | | | |-- id: int32 (nullable = false)
2427 | | | |-- properties: map (nullable = true)
2428 | | | | |-- key: utf8 (nullable = false)
2429 | | | | |-- value: float64 (nullable = false)
2430 ");
2431 }
2432
2433 #[test]
2434 fn test_print_schema_edge_case_types() {
2435 let schema = DFSchema::from_unqualified_fields(
2437 vec![
2438 Field::new("null_field", DataType::Null, true),
2439 Field::new("binary_field", DataType::Binary, false),
2440 Field::new("large_binary", DataType::LargeBinary, true),
2441 Field::new("large_utf8", DataType::LargeUtf8, false),
2442 Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2443 Field::new(
2444 "fixed_size_list",
2445 DataType::FixedSizeList(
2446 Arc::new(Field::new("item", DataType::Int32, true)),
2447 5,
2448 ),
2449 false,
2450 ),
2451 Field::new("decimal32", DataType::Decimal32(9, 4), true),
2452 Field::new("decimal64", DataType::Decimal64(9, 4), true),
2453 Field::new("decimal128", DataType::Decimal128(18, 4), true),
2454 Field::new("decimal256", DataType::Decimal256(38, 10), false),
2455 Field::new("date32", DataType::Date32, true),
2456 Field::new("date64", DataType::Date64, false),
2457 Field::new(
2458 "time32_seconds",
2459 DataType::Time32(arrow::datatypes::TimeUnit::Second),
2460 true,
2461 ),
2462 Field::new(
2463 "time64_nanoseconds",
2464 DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2465 false,
2466 ),
2467 ]
2468 .into(),
2469 HashMap::new(),
2470 )
2471 .unwrap();
2472
2473 let output = schema.tree_string();
2474
2475 insta::assert_snapshot!(output, @r"
2476 root
2477 |-- null_field: null (nullable = true)
2478 |-- binary_field: binary (nullable = false)
2479 |-- large_binary: large_binary (nullable = true)
2480 |-- large_utf8: large_utf8 (nullable = false)
2481 |-- fixed_size_binary: fixed_size_binary (nullable = true)
2482 |-- fixed_size_list: fixed size list (nullable = false)
2483 | |-- item: int32 (nullable = true)
2484 |-- decimal32: decimal32(9, 4) (nullable = true)
2485 |-- decimal64: decimal64(9, 4) (nullable = true)
2486 |-- decimal128: decimal128(18, 4) (nullable = true)
2487 |-- decimal256: decimal256(38, 10) (nullable = false)
2488 |-- date32: date32 (nullable = true)
2489 |-- date64: date64 (nullable = false)
2490 |-- time32_seconds: time32 (nullable = true)
2491 |-- time64_nanoseconds: time64 (nullable = false)
2492 ");
2493 }
2494}