1use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28 field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29 SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34 DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37pub type DFSchemaRef = Arc<DFSchema>;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct DFSchema {
107 inner: SchemaRef,
109 field_qualifiers: Vec<Option<TableReference>>,
112 functional_dependencies: FunctionalDependencies,
114}
115
116impl DFSchema {
117 pub fn empty() -> Self {
119 Self {
120 inner: Arc::new(Schema::new([])),
121 field_qualifiers: vec![],
122 functional_dependencies: FunctionalDependencies::empty(),
123 }
124 }
125
126 pub fn as_arrow(&self) -> &Schema {
130 self.inner.as_ref()
131 }
132
133 pub fn inner(&self) -> &SchemaRef {
137 &self.inner
138 }
139
140 pub fn new_with_metadata(
142 qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
143 metadata: HashMap<String, String>,
144 ) -> Result<Self> {
145 let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
146 qualified_fields.into_iter().unzip();
147
148 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
149
150 let dfschema = Self {
151 inner: schema,
152 field_qualifiers: qualifiers,
153 functional_dependencies: FunctionalDependencies::empty(),
154 };
155 dfschema.check_names()?;
156 Ok(dfschema)
157 }
158
159 pub fn from_unqualified_fields(
161 fields: Fields,
162 metadata: HashMap<String, String>,
163 ) -> Result<Self> {
164 let field_count = fields.len();
165 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
166 let dfschema = Self {
167 inner: schema,
168 field_qualifiers: vec![None; field_count],
169 functional_dependencies: FunctionalDependencies::empty(),
170 };
171 dfschema.check_names()?;
172 Ok(dfschema)
173 }
174
175 pub fn try_from_qualified_schema(
180 qualifier: impl Into<TableReference>,
181 schema: &Schema,
182 ) -> Result<Self> {
183 let qualifier = qualifier.into();
184 let schema = DFSchema {
185 inner: schema.clone().into(),
186 field_qualifiers: vec![Some(qualifier); schema.fields.len()],
187 functional_dependencies: FunctionalDependencies::empty(),
188 };
189 schema.check_names()?;
190 Ok(schema)
191 }
192
193 pub fn from_field_specific_qualified_schema(
195 qualifiers: Vec<Option<TableReference>>,
196 schema: &SchemaRef,
197 ) -> Result<Self> {
198 let dfschema = Self {
199 inner: Arc::clone(schema),
200 field_qualifiers: qualifiers,
201 functional_dependencies: FunctionalDependencies::empty(),
202 };
203 dfschema.check_names()?;
204 Ok(dfschema)
205 }
206
207 pub fn with_field_specific_qualified_schema(
209 &self,
210 qualifiers: Vec<Option<TableReference>>,
211 ) -> Result<Self> {
212 if qualifiers.len() != self.fields().len() {
213 return _plan_err!(
214 "Number of qualifiers must match number of fields. Expected {}, got {}",
215 self.fields().len(),
216 qualifiers.len()
217 );
218 }
219 Ok(DFSchema {
220 inner: Arc::clone(&self.inner),
221 field_qualifiers: qualifiers,
222 functional_dependencies: self.functional_dependencies.clone(),
223 })
224 }
225
226 pub fn check_names(&self) -> Result<()> {
228 let mut qualified_names = BTreeSet::new();
229 let mut unqualified_names = BTreeSet::new();
230
231 for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
232 if let Some(qualifier) = qualifier {
233 if !qualified_names.insert((qualifier, field.name())) {
234 return _schema_err!(SchemaError::DuplicateQualifiedField {
235 qualifier: Box::new(qualifier.clone()),
236 name: field.name().to_string(),
237 });
238 }
239 } else if !unqualified_names.insert(field.name()) {
240 return _schema_err!(SchemaError::DuplicateUnqualifiedField {
241 name: field.name().to_string()
242 });
243 }
244 }
245
246 for (qualifier, name) in qualified_names {
247 if unqualified_names.contains(name) {
248 return _schema_err!(SchemaError::AmbiguousReference {
249 field: Box::new(Column::new(Some(qualifier.clone()), name))
250 });
251 }
252 }
253 Ok(())
254 }
255
256 pub fn with_functional_dependencies(
258 mut self,
259 functional_dependencies: FunctionalDependencies,
260 ) -> Result<Self> {
261 if functional_dependencies.is_valid(self.inner.fields.len()) {
262 self.functional_dependencies = functional_dependencies;
263 Ok(self)
264 } else {
265 _plan_err!(
266 "Invalid functional dependency: {:?}",
267 functional_dependencies
268 )
269 }
270 }
271
272 pub fn join(&self, schema: &DFSchema) -> Result<Self> {
275 let mut schema_builder = SchemaBuilder::new();
276 schema_builder.extend(self.inner.fields().iter().cloned());
277 schema_builder.extend(schema.fields().iter().cloned());
278 let new_schema = schema_builder.finish();
279
280 let mut new_metadata = self.inner.metadata.clone();
281 new_metadata.extend(schema.inner.metadata.clone());
282 let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
283
284 let mut new_qualifiers = self.field_qualifiers.clone();
285 new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
286
287 let new_self = Self {
288 inner: Arc::new(new_schema_with_metadata),
289 field_qualifiers: new_qualifiers,
290 functional_dependencies: FunctionalDependencies::empty(),
291 };
292 new_self.check_names()?;
293 Ok(new_self)
294 }
295
296 pub fn merge(&mut self, other_schema: &DFSchema) {
313 if other_schema.inner.fields.is_empty() {
314 return;
315 }
316
317 let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
318 self.iter().collect();
319 let self_unqualified_names: HashSet<&str> = self
320 .inner
321 .fields
322 .iter()
323 .map(|field| field.name().as_str())
324 .collect();
325
326 let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
327 let mut qualifiers = Vec::new();
328 for (qualifier, field) in other_schema.iter() {
329 let duplicated_field = match qualifier {
331 Some(q) => self_fields.contains(&(Some(q), field)),
332 None => self_unqualified_names.contains(field.name().as_str()),
334 };
335 if !duplicated_field {
336 schema_builder.push(Arc::clone(field));
337 qualifiers.push(qualifier.cloned());
338 }
339 }
340 let mut metadata = self.inner.metadata.clone();
341 metadata.extend(other_schema.inner.metadata.clone());
342
343 let finished = schema_builder.finish();
344 let finished_with_metadata = finished.with_metadata(metadata);
345 self.inner = finished_with_metadata.into();
346 self.field_qualifiers.extend(qualifiers);
347 }
348
349 pub fn fields(&self) -> &Fields {
351 &self.inner.fields
352 }
353
354 pub fn field(&self, i: usize) -> &Field {
357 &self.inner.fields[i]
358 }
359
360 pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
363 (self.field_qualifiers[i].as_ref(), self.field(i))
364 }
365
366 pub fn index_of_column_by_name(
367 &self,
368 qualifier: Option<&TableReference>,
369 name: &str,
370 ) -> Option<usize> {
371 let mut matches = self
372 .iter()
373 .enumerate()
374 .filter(|(_, (q, f))| match (qualifier, q) {
375 (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
379 (Some(_), None) => false,
381 (None, Some(_)) | (None, None) => f.name() == name,
383 })
384 .map(|(idx, _)| idx);
385 matches.next()
386 }
387
388 pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
394 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
395 }
396
397 pub fn index_of_column(&self, col: &Column) -> Result<usize> {
403 self.maybe_index_of_column(col)
404 .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
405 }
406
407 pub fn is_column_from_schema(&self, col: &Column) -> bool {
409 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
410 .is_some()
411 }
412
413 pub fn field_with_name(
415 &self,
416 qualifier: Option<&TableReference>,
417 name: &str,
418 ) -> Result<&Field> {
419 if let Some(qualifier) = qualifier {
420 self.field_with_qualified_name(qualifier, name)
421 } else {
422 self.field_with_unqualified_name(name)
423 }
424 }
425
426 pub fn qualified_field_with_name(
428 &self,
429 qualifier: Option<&TableReference>,
430 name: &str,
431 ) -> Result<(Option<&TableReference>, &Field)> {
432 if let Some(qualifier) = qualifier {
433 let idx = self
434 .index_of_column_by_name(Some(qualifier), name)
435 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
436 Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
437 } else {
438 self.qualified_field_with_unqualified_name(name)
439 }
440 }
441
442 pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
444 self.iter()
445 .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
446 .map(|(_, f)| f.as_ref())
447 .collect()
448 }
449
450 pub fn fields_indices_with_qualified(
452 &self,
453 qualifier: &TableReference,
454 ) -> Vec<usize> {
455 self.iter()
456 .enumerate()
457 .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
458 .collect()
459 }
460
461 pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
463 self.fields()
464 .iter()
465 .filter(|field| field.name() == name)
466 .map(|f| f.as_ref())
467 .collect()
468 }
469
470 pub fn qualified_fields_with_unqualified_name(
472 &self,
473 name: &str,
474 ) -> Vec<(Option<&TableReference>, &Field)> {
475 self.iter()
476 .filter(|(_, field)| field.name() == name)
477 .map(|(qualifier, field)| (qualifier, field.as_ref()))
478 .collect()
479 }
480
481 pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
483 self.iter()
484 .filter(|(_, field)| field.name() == name)
485 .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
486 .collect()
487 }
488
489 pub fn columns(&self) -> Vec<Column> {
491 self.iter()
492 .map(|(qualifier, field)| {
493 Column::new(qualifier.cloned(), field.name().clone())
494 })
495 .collect()
496 }
497
498 pub fn qualified_field_with_unqualified_name(
500 &self,
501 name: &str,
502 ) -> Result<(Option<&TableReference>, &Field)> {
503 let matches = self.qualified_fields_with_unqualified_name(name);
504 match matches.len() {
505 0 => Err(unqualified_field_not_found(name, self)),
506 1 => Ok((matches[0].0, matches[0].1)),
507 _ => {
508 let fields_without_qualifier = matches
516 .iter()
517 .filter(|(q, _)| q.is_none())
518 .collect::<Vec<_>>();
519 if fields_without_qualifier.len() == 1 {
520 Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
521 } else {
522 _schema_err!(SchemaError::AmbiguousReference {
523 field: Box::new(Column::new_unqualified(name.to_string()))
524 })
525 }
526 }
527 }
528 }
529
530 pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
532 self.qualified_field_with_unqualified_name(name)
533 .map(|(_, field)| field)
534 }
535
536 pub fn field_with_qualified_name(
538 &self,
539 qualifier: &TableReference,
540 name: &str,
541 ) -> Result<&Field> {
542 let idx = self
543 .index_of_column_by_name(Some(qualifier), name)
544 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
545
546 Ok(self.field(idx))
547 }
548
549 pub fn qualified_field_from_column(
551 &self,
552 column: &Column,
553 ) -> Result<(Option<&TableReference>, &Field)> {
554 self.qualified_field_with_name(column.relation.as_ref(), &column.name)
555 }
556
557 pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
559 self.fields().iter().any(|field| field.name() == name)
560 }
561
562 pub fn has_column_with_qualified_name(
564 &self,
565 qualifier: &TableReference,
566 name: &str,
567 ) -> bool {
568 self.iter()
569 .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
570 }
571
572 pub fn has_column(&self, column: &Column) -> bool {
574 match &column.relation {
575 Some(r) => self.has_column_with_qualified_name(r, &column.name),
576 None => self.has_column_with_unqualified_name(&column.name),
577 }
578 }
579
580 pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
582 self.inner
583 .fields
584 .iter()
585 .zip(arrow_schema.fields().iter())
586 .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
587 }
588
589 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
591 pub fn check_arrow_schema_type_compatible(
592 &self,
593 arrow_schema: &Schema,
594 ) -> Result<()> {
595 let self_arrow_schema = self.as_arrow();
596 self_arrow_schema
597 .fields()
598 .iter()
599 .zip(arrow_schema.fields().iter())
600 .try_for_each(|(l_field, r_field)| {
601 if !can_cast_types(r_field.data_type(), l_field.data_type()) {
602 _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
603 r_field.name(),
604 r_field.data_type(),
605 l_field.name(),
606 l_field.data_type())
607 } else {
608 Ok(())
609 }
610 })
611 }
612
613 pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
619 if self.fields().len() != other.fields().len() {
620 return false;
621 }
622 let self_fields = self.iter();
623 let other_fields = other.iter();
624 self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
625 q1 == q2
626 && f1.name() == f2.name()
627 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
628 })
629 }
630
631 #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
632 pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
633 self.has_equivalent_names_and_types(other).is_ok()
634 }
635
636 pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
648 if self.fields().len() != other.fields().len() {
650 _plan_err!(
651 "Schema mismatch: the schema length are not same \
652 Expected schema length: {}, got: {}",
653 self.fields().len(),
654 other.fields().len()
655 )
656 } else {
657 self.fields()
660 .iter()
661 .zip(other.fields().iter())
662 .try_for_each(|(f1, f2)| {
663 if f1.name() != f2.name()
664 || (!DFSchema::datatype_is_semantically_equal(
665 f1.data_type(),
666 f2.data_type(),
667 ))
668 {
669 _plan_err!(
670 "Schema mismatch: Expected field '{}' with type {}, \
671 but got '{}' with type {}.",
672 f1.name(),
673 f1.data_type(),
674 f2.name(),
675 f2.data_type()
676 )
677 } else {
678 Ok(())
679 }
680 })
681 }
682 }
683
684 pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
692 match (dt1, dt2) {
694 (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
695 v1.as_ref() == v2.as_ref()
696 }
697 (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
698 (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
699 (DataType::List(f1), DataType::List(f2))
700 | (DataType::LargeList(f1), DataType::LargeList(f2))
701 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
702 Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
705 }
706 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
707 match (f1.data_type(), f2.data_type()) {
710 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
711 f1_inner.len() == f2_inner.len()
712 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
713 Self::datatype_is_logically_equal(
714 f1.data_type(),
715 f2.data_type(),
716 )
717 })
718 }
719 _ => panic!("Map type should have an inner struct field"),
720 }
721 }
722 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
723 let iter1 = fields1.iter();
724 let iter2 = fields2.iter();
725 fields1.len() == fields2.len() &&
726 iter1
728 .zip(iter2)
729 .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
730 }
731 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
732 let iter1 = fields1.iter();
733 let iter2 = fields2.iter();
734 fields1.len() == fields2.len() &&
735 iter1
737 .zip(iter2)
738 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
739 }
740 (DataType::Utf8, DataType::Utf8View) => true,
742 (DataType::Utf8View, DataType::Utf8) => true,
743 _ => Self::datatype_is_semantically_equal(dt1, dt2),
744 }
745 }
746
747 pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
753 match (dt1, dt2) {
755 (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
756 Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
757 && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
758 }
759 (DataType::List(f1), DataType::List(f2))
760 | (DataType::LargeList(f1), DataType::LargeList(f2))
761 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
762 Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
765 }
766 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
767 match (f1.data_type(), f2.data_type()) {
770 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
771 f1_inner.len() == f2_inner.len()
772 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
773 Self::datatype_is_semantically_equal(
774 f1.data_type(),
775 f2.data_type(),
776 )
777 })
778 }
779 _ => panic!("Map type should have an inner struct field"),
780 }
781 }
782 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
783 let iter1 = fields1.iter();
784 let iter2 = fields2.iter();
785 fields1.len() == fields2.len() &&
786 iter1
788 .zip(iter2)
789 .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
790 }
791 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
792 let iter1 = fields1.iter();
793 let iter2 = fields2.iter();
794 fields1.len() == fields2.len() &&
795 iter1
797 .zip(iter2)
798 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
799 }
800 (
801 DataType::Decimal32(_l_precision, _l_scale),
802 DataType::Decimal32(_r_precision, _r_scale),
803 ) => true,
804 (
805 DataType::Decimal64(_l_precision, _l_scale),
806 DataType::Decimal64(_r_precision, _r_scale),
807 ) => true,
808 (
809 DataType::Decimal128(_l_precision, _l_scale),
810 DataType::Decimal128(_r_precision, _r_scale),
811 ) => true,
812 (
813 DataType::Decimal256(_l_precision, _l_scale),
814 DataType::Decimal256(_r_precision, _r_scale),
815 ) => true,
816 (
817 DataType::Timestamp(_l_time_unit, _l_timezone),
818 DataType::Timestamp(_r_time_unit, _r_timezone),
819 ) => true,
820 _ => dt1 == dt2,
821 }
822 }
823
824 fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
825 f1.name() == f2.name()
826 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
827 }
828
829 fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
830 f1.name() == f2.name()
831 && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
832 }
833
834 pub fn strip_qualifiers(self) -> Self {
836 DFSchema {
837 field_qualifiers: vec![None; self.inner.fields.len()],
838 inner: self.inner,
839 functional_dependencies: self.functional_dependencies,
840 }
841 }
842
843 pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
845 let qualifier = qualifier.into();
846 DFSchema {
847 field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
848 inner: self.inner,
849 functional_dependencies: self.functional_dependencies,
850 }
851 }
852
853 pub fn field_names(&self) -> Vec<String> {
855 self.iter()
856 .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
857 .collect::<Vec<_>>()
858 }
859
860 pub fn metadata(&self) -> &HashMap<String, String> {
862 &self.inner.metadata
863 }
864
865 pub fn functional_dependencies(&self) -> &FunctionalDependencies {
867 &self.functional_dependencies
868 }
869
870 pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
872 self.field_qualifiers
873 .iter()
874 .zip(self.inner.fields().iter())
875 .map(|(qualifier, field)| (qualifier.as_ref(), field))
876 }
877 pub fn tree_string(&self) -> impl Display + '_ {
907 let mut result = String::from("root\n");
908
909 for (qualifier, field) in self.iter() {
910 let field_name = match qualifier {
911 Some(q) => format!("{}.{}", q, field.name()),
912 None => field.name().to_string(),
913 };
914
915 format_field_with_indent(
916 &mut result,
917 &field_name,
918 field.data_type(),
919 field.is_nullable(),
920 " ",
921 );
922 }
923
924 if result.ends_with('\n') {
926 result.pop();
927 }
928
929 result
930 }
931}
932
933fn format_field_with_indent(
935 result: &mut String,
936 field_name: &str,
937 data_type: &DataType,
938 nullable: bool,
939 indent: &str,
940) {
941 let nullable_str = nullable.to_string().to_lowercase();
942 let child_indent = format!("{indent}| ");
943
944 match data_type {
945 DataType::List(field) => {
946 result.push_str(&format!(
947 "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
948 ));
949 format_field_with_indent(
950 result,
951 field.name(),
952 field.data_type(),
953 field.is_nullable(),
954 &child_indent,
955 );
956 }
957 DataType::LargeList(field) => {
958 result.push_str(&format!(
959 "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
960 ));
961 format_field_with_indent(
962 result,
963 field.name(),
964 field.data_type(),
965 field.is_nullable(),
966 &child_indent,
967 );
968 }
969 DataType::FixedSizeList(field, _size) => {
970 result.push_str(&format!(
971 "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
972 ));
973 format_field_with_indent(
974 result,
975 field.name(),
976 field.data_type(),
977 field.is_nullable(),
978 &child_indent,
979 );
980 }
981 DataType::Map(field, _) => {
982 result.push_str(&format!(
983 "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
984 ));
985 if let DataType::Struct(inner_fields) = field.data_type() {
986 if inner_fields.len() == 2 {
987 format_field_with_indent(
988 result,
989 "key",
990 inner_fields[0].data_type(),
991 inner_fields[0].is_nullable(),
992 &child_indent,
993 );
994 let value_contains_null =
995 field.is_nullable().to_string().to_lowercase();
996 match inner_fields[1].data_type() {
998 DataType::Struct(_)
999 | DataType::List(_)
1000 | DataType::LargeList(_)
1001 | DataType::FixedSizeList(_, _)
1002 | DataType::Map(_, _) => {
1003 format_field_with_indent(
1004 result,
1005 "value",
1006 inner_fields[1].data_type(),
1007 inner_fields[1].is_nullable(),
1008 &child_indent,
1009 );
1010 }
1011 _ => {
1012 result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1013 format_simple_data_type(inner_fields[1].data_type())));
1014 }
1015 }
1016 }
1017 }
1018 }
1019 DataType::Struct(fields) => {
1020 result.push_str(&format!(
1021 "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1022 ));
1023 for struct_field in fields {
1024 format_field_with_indent(
1025 result,
1026 struct_field.name(),
1027 struct_field.data_type(),
1028 struct_field.is_nullable(),
1029 &child_indent,
1030 );
1031 }
1032 }
1033 _ => {
1034 let type_str = format_simple_data_type(data_type);
1035 result.push_str(&format!(
1036 "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1037 ));
1038 }
1039 }
1040}
1041
1042fn format_simple_data_type(data_type: &DataType) -> String {
1044 match data_type {
1045 DataType::Boolean => "boolean".to_string(),
1046 DataType::Int8 => "int8".to_string(),
1047 DataType::Int16 => "int16".to_string(),
1048 DataType::Int32 => "int32".to_string(),
1049 DataType::Int64 => "int64".to_string(),
1050 DataType::UInt8 => "uint8".to_string(),
1051 DataType::UInt16 => "uint16".to_string(),
1052 DataType::UInt32 => "uint32".to_string(),
1053 DataType::UInt64 => "uint64".to_string(),
1054 DataType::Float16 => "float16".to_string(),
1055 DataType::Float32 => "float32".to_string(),
1056 DataType::Float64 => "float64".to_string(),
1057 DataType::Utf8 => "utf8".to_string(),
1058 DataType::LargeUtf8 => "large_utf8".to_string(),
1059 DataType::Binary => "binary".to_string(),
1060 DataType::LargeBinary => "large_binary".to_string(),
1061 DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1062 DataType::Date32 => "date32".to_string(),
1063 DataType::Date64 => "date64".to_string(),
1064 DataType::Time32(_) => "time32".to_string(),
1065 DataType::Time64(_) => "time64".to_string(),
1066 DataType::Timestamp(_, tz) => match tz {
1067 Some(tz_str) => format!("timestamp ({tz_str})"),
1068 None => "timestamp".to_string(),
1069 },
1070 DataType::Interval(_) => "interval".to_string(),
1071 DataType::Dictionary(_, value_type) => {
1072 format_simple_data_type(value_type.as_ref())
1073 }
1074 DataType::Decimal32(precision, scale) => {
1075 format!("decimal32({precision}, {scale})")
1076 }
1077 DataType::Decimal64(precision, scale) => {
1078 format!("decimal64({precision}, {scale})")
1079 }
1080 DataType::Decimal128(precision, scale) => {
1081 format!("decimal128({precision}, {scale})")
1082 }
1083 DataType::Decimal256(precision, scale) => {
1084 format!("decimal256({precision}, {scale})")
1085 }
1086 DataType::Null => "null".to_string(),
1087 _ => format!("{data_type}").to_lowercase(),
1088 }
1089}
1090
1091impl AsRef<Schema> for DFSchema {
1093 fn as_ref(&self) -> &Schema {
1094 self.as_arrow()
1095 }
1096}
1097
1098impl AsRef<SchemaRef> for DFSchema {
1101 fn as_ref(&self) -> &SchemaRef {
1102 self.inner()
1103 }
1104}
1105
1106impl TryFrom<Schema> for DFSchema {
1108 type Error = DataFusionError;
1109 fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1110 Self::try_from(Arc::new(schema))
1111 }
1112}
1113
1114impl TryFrom<SchemaRef> for DFSchema {
1115 type Error = DataFusionError;
1116 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1117 let field_count = schema.fields.len();
1118 let dfschema = Self {
1119 inner: schema,
1120 field_qualifiers: vec![None; field_count],
1121 functional_dependencies: FunctionalDependencies::empty(),
1122 };
1123 Ok(dfschema)
1129 }
1130}
1131
1132impl Hash for DFSchema {
1134 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1135 self.inner.fields.hash(state);
1136 self.inner.metadata.len().hash(state); }
1138}
1139
1140pub trait ToDFSchema
1142where
1143 Self: Sized,
1144{
1145 fn to_dfschema(self) -> Result<DFSchema>;
1147
1148 fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1150 Ok(Arc::new(self.to_dfschema()?))
1151 }
1152}
1153
1154impl ToDFSchema for Schema {
1155 fn to_dfschema(self) -> Result<DFSchema> {
1156 DFSchema::try_from(self)
1157 }
1158}
1159
1160impl ToDFSchema for SchemaRef {
1161 fn to_dfschema(self) -> Result<DFSchema> {
1162 DFSchema::try_from(self)
1163 }
1164}
1165
1166impl ToDFSchema for Vec<Field> {
1167 fn to_dfschema(self) -> Result<DFSchema> {
1168 let field_count = self.len();
1169 let schema = Schema {
1170 fields: self.into(),
1171 metadata: HashMap::new(),
1172 };
1173 let dfschema = DFSchema {
1174 inner: schema.into(),
1175 field_qualifiers: vec![None; field_count],
1176 functional_dependencies: FunctionalDependencies::empty(),
1177 };
1178 Ok(dfschema)
1179 }
1180}
1181
1182impl Display for DFSchema {
1183 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1184 write!(
1185 f,
1186 "fields:[{}], metadata:{:?}",
1187 self.iter()
1188 .map(|(q, f)| qualified_name(q, f.name()))
1189 .collect::<Vec<String>>()
1190 .join(", "),
1191 self.inner.metadata
1192 )
1193 }
1194}
1195
1196pub trait ExprSchema: std::fmt::Debug {
1202 fn nullable(&self, col: &Column) -> Result<bool> {
1204 Ok(self.field_from_column(col)?.is_nullable())
1205 }
1206
1207 fn data_type(&self, col: &Column) -> Result<&DataType> {
1209 Ok(self.field_from_column(col)?.data_type())
1210 }
1211
1212 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1214 Ok(self.field_from_column(col)?.metadata())
1215 }
1216
1217 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1219 let field = self.field_from_column(col)?;
1220 Ok((field.data_type(), field.is_nullable()))
1221 }
1222
1223 fn field_from_column(&self, col: &Column) -> Result<&Field>;
1225}
1226
1227impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1229 fn nullable(&self, col: &Column) -> Result<bool> {
1230 self.as_ref().nullable(col)
1231 }
1232
1233 fn data_type(&self, col: &Column) -> Result<&DataType> {
1234 self.as_ref().data_type(col)
1235 }
1236
1237 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1238 ExprSchema::metadata(self.as_ref(), col)
1239 }
1240
1241 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1242 self.as_ref().data_type_and_nullable(col)
1243 }
1244
1245 fn field_from_column(&self, col: &Column) -> Result<&Field> {
1246 self.as_ref().field_from_column(col)
1247 }
1248}
1249
1250impl ExprSchema for DFSchema {
1251 fn field_from_column(&self, col: &Column) -> Result<&Field> {
1252 match &col.relation {
1253 Some(r) => self.field_with_qualified_name(r, &col.name),
1254 None => self.field_with_unqualified_name(&col.name),
1255 }
1256 }
1257}
1258
1259pub trait SchemaExt {
1261 fn equivalent_names_and_types(&self, other: &Self) -> bool;
1266
1267 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1275}
1276
1277impl SchemaExt for Schema {
1278 fn equivalent_names_and_types(&self, other: &Self) -> bool {
1279 if self.fields().len() != other.fields().len() {
1280 return false;
1281 }
1282
1283 self.fields()
1284 .iter()
1285 .zip(other.fields().iter())
1286 .all(|(f1, f2)| {
1287 f1.name() == f2.name()
1288 && DFSchema::datatype_is_semantically_equal(
1289 f1.data_type(),
1290 f2.data_type(),
1291 )
1292 })
1293 }
1294
1295 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1297 if self.fields().len() != other.fields().len() {
1299 _plan_err!(
1300 "Inserting query must have the same schema length as the table. \
1301 Expected table schema length: {}, got: {}",
1302 self.fields().len(),
1303 other.fields().len()
1304 )
1305 } else {
1306 self.fields()
1309 .iter()
1310 .zip(other.fields().iter())
1311 .try_for_each(|(f1, f2)| {
1312 if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1313 _plan_err!(
1314 "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1315 but got '{}' with type {}.",
1316 f1.name(),
1317 f1.data_type(),
1318 f2.name(),
1319 f2.data_type())
1320 } else {
1321 Ok(())
1322 }
1323 })
1324 }
1325 }
1326}
1327
1328pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1329 match qualifier {
1330 Some(q) => format!("{q}.{name}"),
1331 None => name.to_string(),
1332 }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337 use crate::assert_contains;
1338
1339 use super::*;
1340
1341 #[test]
1342 fn qualifier_in_name() -> Result<()> {
1343 let col = Column::from_name("t1.c0");
1344 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1345 let err = schema.index_of_column(&col).unwrap_err();
1347 let expected = "Schema error: No field named \"t1.c0\". \
1348 Column names are case sensitive. \
1349 You can use double quotes to refer to the \"\"t1.c0\"\" column \
1350 or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1351 Did you mean 't1.c0'?.";
1352 assert_eq!(err.strip_backtrace(), expected);
1353 Ok(())
1354 }
1355
1356 #[test]
1357 fn quoted_qualifiers_in_name() -> Result<()> {
1358 let col = Column::from_name("t1.c0");
1359 let schema = DFSchema::try_from_qualified_schema(
1360 "t1",
1361 &Schema::new(vec![
1362 Field::new("CapitalColumn", DataType::Boolean, true),
1363 Field::new("field.with.period", DataType::Boolean, true),
1364 ]),
1365 )?;
1366
1367 let err = schema.index_of_column(&col).unwrap_err();
1369 let expected = "Schema error: No field named \"t1.c0\". \
1370 Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1371 assert_eq!(err.strip_backtrace(), expected);
1372 Ok(())
1373 }
1374
1375 #[test]
1376 fn from_unqualified_schema() -> Result<()> {
1377 let schema = DFSchema::try_from(test_schema_1())?;
1378 assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1379 Ok(())
1380 }
1381
1382 #[test]
1383 fn from_qualified_schema() -> Result<()> {
1384 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1385 assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1386 Ok(())
1387 }
1388
1389 #[test]
1390 fn test_from_field_specific_qualified_schema() -> Result<()> {
1391 let schema = DFSchema::from_field_specific_qualified_schema(
1392 vec![Some("t1".into()), None],
1393 &Arc::new(Schema::new(vec![
1394 Field::new("c0", DataType::Boolean, true),
1395 Field::new("c1", DataType::Boolean, true),
1396 ])),
1397 )?;
1398 assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1399 Ok(())
1400 }
1401
1402 #[test]
1403 fn test_from_qualified_fields() -> Result<()> {
1404 let schema = DFSchema::new_with_metadata(
1405 vec![
1406 (
1407 Some("t0".into()),
1408 Arc::new(Field::new("c0", DataType::Boolean, true)),
1409 ),
1410 (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1411 ],
1412 HashMap::new(),
1413 )?;
1414 assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1415 Ok(())
1416 }
1417
1418 #[test]
1419 fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1420 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1421 let arrow_schema = schema.as_arrow();
1422 insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1423 Ok(())
1424 }
1425
1426 #[test]
1427 fn join_qualified() -> Result<()> {
1428 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1429 let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1430 let join = left.join(&right)?;
1431 assert_eq!(
1432 "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1433 join.to_string()
1434 );
1435 assert!(join
1437 .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1438 .is_ok());
1439 assert!(join
1440 .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1441 .is_ok());
1442 assert!(join.field_with_unqualified_name("c0").is_err());
1444 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1445 assert!(join.field_with_unqualified_name("t2.c0").is_err());
1446 Ok(())
1447 }
1448
1449 #[test]
1450 fn join_qualified_duplicate() -> Result<()> {
1451 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1452 let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1453 let join = left.join(&right);
1454 assert_eq!(
1455 join.unwrap_err().strip_backtrace(),
1456 "Schema error: Schema contains duplicate qualified field name t1.c0",
1457 );
1458 Ok(())
1459 }
1460
1461 #[test]
1462 fn join_unqualified_duplicate() -> Result<()> {
1463 let left = DFSchema::try_from(test_schema_1())?;
1464 let right = DFSchema::try_from(test_schema_1())?;
1465 let join = left.join(&right);
1466 assert_eq!(
1467 join.unwrap_err().strip_backtrace(),
1468 "Schema error: Schema contains duplicate unqualified field name c0"
1469 );
1470 Ok(())
1471 }
1472
1473 #[test]
1474 fn join_mixed() -> Result<()> {
1475 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1476 let right = DFSchema::try_from(test_schema_2())?;
1477 let join = left.join(&right)?;
1478 assert_eq!(
1479 "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1480 join.to_string()
1481 );
1482 assert!(join
1484 .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1485 .is_ok());
1486 assert!(join.field_with_unqualified_name("c0").is_ok());
1487 assert!(join.field_with_unqualified_name("c100").is_ok());
1488 assert!(join.field_with_name(None, "c100").is_ok());
1489 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1491 assert!(join.field_with_unqualified_name("t1.c100").is_err());
1492 assert!(join
1493 .field_with_qualified_name(&TableReference::bare(""), "c100")
1494 .is_err());
1495 Ok(())
1496 }
1497
1498 #[test]
1499 fn join_mixed_duplicate() -> Result<()> {
1500 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1501 let right = DFSchema::try_from(test_schema_1())?;
1502 let join = left.join(&right);
1503 assert_contains!(join.unwrap_err().to_string(),
1504 "Schema error: Schema contains qualified \
1505 field name t1.c0 and unqualified field name c0 which would be ambiguous");
1506 Ok(())
1507 }
1508
1509 #[test]
1510 fn helpful_error_messages() -> Result<()> {
1511 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1512 let expected_help = "Valid fields are t1.c0, t1.c1.";
1513 assert_contains!(
1514 schema
1515 .field_with_qualified_name(&TableReference::bare("x"), "y")
1516 .unwrap_err()
1517 .to_string(),
1518 expected_help
1519 );
1520 assert_contains!(
1521 schema
1522 .field_with_unqualified_name("y")
1523 .unwrap_err()
1524 .to_string(),
1525 expected_help
1526 );
1527 assert!(schema.index_of_column_by_name(None, "y").is_none());
1528 assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1529
1530 Ok(())
1531 }
1532
1533 #[test]
1534 fn select_without_valid_fields() {
1535 let schema = DFSchema::empty();
1536
1537 let col = Column::from_qualified_name("t1.c0");
1538 let err = schema.index_of_column(&col).unwrap_err();
1539 let expected = "Schema error: No field named t1.c0.";
1540 assert_eq!(err.strip_backtrace(), expected);
1541
1542 let col = Column::from_name("c0");
1544 let err = schema.index_of_column(&col).err().unwrap();
1545 let expected = "Schema error: No field named c0.";
1546 assert_eq!(err.strip_backtrace(), expected);
1547 }
1548
1549 #[test]
1550 fn into() {
1551 let arrow_schema = Schema::new_with_metadata(
1553 vec![Field::new("c0", DataType::Int64, true)],
1554 test_metadata(),
1555 );
1556 let arrow_schema_ref = Arc::new(arrow_schema.clone());
1557
1558 let df_schema = DFSchema {
1559 inner: Arc::clone(&arrow_schema_ref),
1560 field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1561 functional_dependencies: FunctionalDependencies::empty(),
1562 };
1563 let df_schema_ref = Arc::new(df_schema.clone());
1564
1565 {
1566 let arrow_schema = arrow_schema.clone();
1567 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1568
1569 assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1570 assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1571 }
1572
1573 {
1574 let arrow_schema = arrow_schema.clone();
1575 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1576
1577 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1578 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1579 }
1580
1581 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1583 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1584 }
1585
1586 fn test_schema_1() -> Schema {
1587 Schema::new(vec![
1588 Field::new("c0", DataType::Boolean, true),
1589 Field::new("c1", DataType::Boolean, true),
1590 ])
1591 }
1592 #[test]
1593 fn test_dfschema_to_schema_conversion() {
1594 let mut a_metadata = HashMap::new();
1595 a_metadata.insert("key".to_string(), "value".to_string());
1596 let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1597
1598 let mut b_metadata = HashMap::new();
1599 b_metadata.insert("key".to_string(), "value".to_string());
1600 let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1601
1602 let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1603
1604 let df_schema = DFSchema {
1605 inner: Arc::clone(&schema),
1606 field_qualifiers: vec![None; schema.fields.len()],
1607 functional_dependencies: FunctionalDependencies::empty(),
1608 };
1609
1610 assert_eq!(df_schema.inner.metadata(), schema.metadata())
1611 }
1612
1613 #[test]
1614 fn test_contain_column() -> Result<()> {
1615 {
1617 let col = Column::from_qualified_name("t1.c0");
1618 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1619 assert!(schema.is_column_from_schema(&col));
1620 }
1621
1622 {
1624 let col = Column::from_qualified_name("t1.c2");
1625 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1626 assert!(!schema.is_column_from_schema(&col));
1627 }
1628
1629 {
1631 let col = Column::from_name("c0");
1632 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1633 assert!(schema.is_column_from_schema(&col));
1634 }
1635
1636 {
1638 let col = Column::from_name("c2");
1639 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1640 assert!(!schema.is_column_from_schema(&col));
1641 }
1642
1643 Ok(())
1644 }
1645
1646 #[test]
1647 fn test_datatype_is_logically_equal() {
1648 assert!(DFSchema::datatype_is_logically_equal(
1649 &DataType::Int8,
1650 &DataType::Int8
1651 ));
1652
1653 assert!(!DFSchema::datatype_is_logically_equal(
1654 &DataType::Int8,
1655 &DataType::Int16
1656 ));
1657
1658 assert!(DFSchema::datatype_is_logically_equal(
1662 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1663 &DataType::List(Field::new("element", DataType::Int8, false).into())
1664 ));
1665
1666 assert!(!DFSchema::datatype_is_logically_equal(
1668 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1669 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1670 ));
1671
1672 let map_field = DataType::Map(
1674 Field::new(
1675 "entries",
1676 DataType::Struct(Fields::from(vec![
1677 Field::new("key", DataType::Int8, false),
1678 Field::new("value", DataType::Int8, true),
1679 ])),
1680 true,
1681 )
1682 .into(),
1683 true,
1684 );
1685
1686 assert!(DFSchema::datatype_is_logically_equal(
1688 &map_field,
1689 &DataType::Map(
1690 Field::new(
1691 "pairs",
1692 DataType::Struct(Fields::from(vec![
1693 Field::new("one", DataType::Int8, false),
1694 Field::new("two", DataType::Int8, false)
1695 ])),
1696 true
1697 )
1698 .into(),
1699 true
1700 )
1701 ));
1702 assert!(!DFSchema::datatype_is_logically_equal(
1704 &map_field,
1705 &DataType::Map(
1706 Field::new(
1707 "entries",
1708 DataType::Struct(Fields::from(vec![
1709 Field::new("key", DataType::Int8, false),
1710 Field::new("value", DataType::Int16, true)
1711 ])),
1712 true
1713 )
1714 .into(),
1715 true
1716 )
1717 ));
1718
1719 assert!(!DFSchema::datatype_is_logically_equal(
1721 &map_field,
1722 &DataType::Map(
1723 Field::new(
1724 "entries",
1725 DataType::Struct(Fields::from(vec![
1726 Field::new("key", DataType::Int16, false),
1727 Field::new("value", DataType::Int8, true)
1728 ])),
1729 true
1730 )
1731 .into(),
1732 true
1733 )
1734 ));
1735
1736 let struct_field = DataType::Struct(Fields::from(vec![
1739 Field::new("a", DataType::Int8, true),
1740 Field::new("b", DataType::Int8, true),
1741 ]));
1742
1743 assert!(DFSchema::datatype_is_logically_equal(
1745 &struct_field,
1746 &DataType::Struct(Fields::from(vec![
1747 Field::new("a", DataType::Int8, false),
1748 Field::new("b", DataType::Int8, true),
1749 ]))
1750 ));
1751
1752 assert!(!DFSchema::datatype_is_logically_equal(
1754 &struct_field,
1755 &DataType::Struct(Fields::from(vec![
1756 Field::new("x", DataType::Int8, true),
1757 Field::new("y", DataType::Int8, true),
1758 ]))
1759 ));
1760
1761 assert!(!DFSchema::datatype_is_logically_equal(
1763 &struct_field,
1764 &DataType::Struct(Fields::from(vec![
1765 Field::new("a", DataType::Int16, true),
1766 Field::new("b", DataType::Int8, true),
1767 ]))
1768 ));
1769
1770 assert!(!DFSchema::datatype_is_logically_equal(
1772 &struct_field,
1773 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1774 ));
1775 }
1776
1777 #[test]
1778 fn test_datatype_is_logically_equivalent_to_dictionary() {
1779 assert!(DFSchema::datatype_is_logically_equal(
1781 &DataType::Utf8,
1782 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1783 ));
1784 }
1785
1786 #[test]
1787 fn test_datatype_is_semantically_equal() {
1788 assert!(DFSchema::datatype_is_semantically_equal(
1789 &DataType::Int8,
1790 &DataType::Int8
1791 ));
1792
1793 assert!(!DFSchema::datatype_is_semantically_equal(
1794 &DataType::Int8,
1795 &DataType::Int16
1796 ));
1797
1798 assert!(DFSchema::datatype_is_semantically_equal(
1800 &DataType::Decimal32(1, 2),
1801 &DataType::Decimal32(2, 1),
1802 ));
1803
1804 assert!(DFSchema::datatype_is_semantically_equal(
1805 &DataType::Decimal64(1, 2),
1806 &DataType::Decimal64(2, 1),
1807 ));
1808
1809 assert!(DFSchema::datatype_is_semantically_equal(
1810 &DataType::Decimal128(1, 2),
1811 &DataType::Decimal128(2, 1),
1812 ));
1813
1814 assert!(DFSchema::datatype_is_semantically_equal(
1815 &DataType::Decimal256(1, 2),
1816 &DataType::Decimal256(2, 1),
1817 ));
1818
1819 assert!(DFSchema::datatype_is_semantically_equal(
1821 &DataType::Timestamp(
1822 arrow::datatypes::TimeUnit::Microsecond,
1823 Some("UTC".into())
1824 ),
1825 &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1826 ));
1827
1828 assert!(DFSchema::datatype_is_semantically_equal(
1832 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1833 &DataType::List(Field::new("element", DataType::Int8, false).into())
1834 ));
1835
1836 assert!(!DFSchema::datatype_is_semantically_equal(
1838 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1839 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1840 ));
1841
1842 let map_field = DataType::Map(
1844 Field::new(
1845 "entries",
1846 DataType::Struct(Fields::from(vec![
1847 Field::new("key", DataType::Int8, false),
1848 Field::new("value", DataType::Int8, true),
1849 ])),
1850 true,
1851 )
1852 .into(),
1853 true,
1854 );
1855
1856 assert!(DFSchema::datatype_is_semantically_equal(
1858 &map_field,
1859 &DataType::Map(
1860 Field::new(
1861 "pairs",
1862 DataType::Struct(Fields::from(vec![
1863 Field::new("one", DataType::Int8, false),
1864 Field::new("two", DataType::Int8, false)
1865 ])),
1866 true
1867 )
1868 .into(),
1869 true
1870 )
1871 ));
1872 assert!(!DFSchema::datatype_is_semantically_equal(
1874 &map_field,
1875 &DataType::Map(
1876 Field::new(
1877 "entries",
1878 DataType::Struct(Fields::from(vec![
1879 Field::new("key", DataType::Int8, false),
1880 Field::new("value", DataType::Int16, true)
1881 ])),
1882 true
1883 )
1884 .into(),
1885 true
1886 )
1887 ));
1888
1889 assert!(!DFSchema::datatype_is_semantically_equal(
1891 &map_field,
1892 &DataType::Map(
1893 Field::new(
1894 "entries",
1895 DataType::Struct(Fields::from(vec![
1896 Field::new("key", DataType::Int16, false),
1897 Field::new("value", DataType::Int8, true)
1898 ])),
1899 true
1900 )
1901 .into(),
1902 true
1903 )
1904 ));
1905
1906 let struct_field = DataType::Struct(Fields::from(vec![
1909 Field::new("a", DataType::Int8, true),
1910 Field::new("b", DataType::Int8, true),
1911 ]));
1912
1913 assert!(DFSchema::datatype_is_logically_equal(
1915 &struct_field,
1916 &DataType::Struct(Fields::from(vec![
1917 Field::new("a", DataType::Int8, false),
1918 Field::new("b", DataType::Int8, true),
1919 ]))
1920 ));
1921
1922 assert!(!DFSchema::datatype_is_logically_equal(
1924 &struct_field,
1925 &DataType::Struct(Fields::from(vec![
1926 Field::new("x", DataType::Int8, true),
1927 Field::new("y", DataType::Int8, true),
1928 ]))
1929 ));
1930
1931 assert!(!DFSchema::datatype_is_logically_equal(
1933 &struct_field,
1934 &DataType::Struct(Fields::from(vec![
1935 Field::new("a", DataType::Int16, true),
1936 Field::new("b", DataType::Int8, true),
1937 ]))
1938 ));
1939
1940 assert!(!DFSchema::datatype_is_logically_equal(
1942 &struct_field,
1943 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1944 ));
1945 }
1946
1947 #[test]
1948 fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1949 assert!(!DFSchema::datatype_is_semantically_equal(
1951 &DataType::Utf8,
1952 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1953 ));
1954 }
1955
1956 fn test_schema_2() -> Schema {
1957 Schema::new(vec![
1958 Field::new("c100", DataType::Boolean, true),
1959 Field::new("c101", DataType::Boolean, true),
1960 ])
1961 }
1962
1963 fn test_metadata() -> HashMap<String, String> {
1964 test_metadata_n(2)
1965 }
1966
1967 fn test_metadata_n(n: usize) -> HashMap<String, String> {
1968 (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1969 }
1970
1971 #[test]
1972 fn test_print_schema_unqualified() {
1973 let schema = DFSchema::from_unqualified_fields(
1974 vec![
1975 Field::new("id", DataType::Int32, false),
1976 Field::new("name", DataType::Utf8, true),
1977 Field::new("age", DataType::Int64, true),
1978 Field::new("active", DataType::Boolean, false),
1979 ]
1980 .into(),
1981 HashMap::new(),
1982 )
1983 .unwrap();
1984
1985 let output = schema.tree_string();
1986
1987 insta::assert_snapshot!(output, @r"
1988 root
1989 |-- id: int32 (nullable = false)
1990 |-- name: utf8 (nullable = true)
1991 |-- age: int64 (nullable = true)
1992 |-- active: boolean (nullable = false)
1993 ");
1994 }
1995
1996 #[test]
1997 fn test_print_schema_qualified() {
1998 let schema = DFSchema::try_from_qualified_schema(
1999 "table1",
2000 &Schema::new(vec![
2001 Field::new("id", DataType::Int32, false),
2002 Field::new("name", DataType::Utf8, true),
2003 ]),
2004 )
2005 .unwrap();
2006
2007 let output = schema.tree_string();
2008
2009 insta::assert_snapshot!(output, @r"
2010 root
2011 |-- table1.id: int32 (nullable = false)
2012 |-- table1.name: utf8 (nullable = true)
2013 ");
2014 }
2015
2016 #[test]
2017 fn test_print_schema_complex_types() {
2018 let struct_field = Field::new(
2019 "address",
2020 DataType::Struct(Fields::from(vec![
2021 Field::new("street", DataType::Utf8, true),
2022 Field::new("city", DataType::Utf8, true),
2023 ])),
2024 true,
2025 );
2026
2027 let list_field = Field::new(
2028 "tags",
2029 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2030 true,
2031 );
2032
2033 let schema = DFSchema::from_unqualified_fields(
2034 vec![
2035 Field::new("id", DataType::Int32, false),
2036 struct_field,
2037 list_field,
2038 Field::new("score", DataType::Decimal128(10, 2), true),
2039 ]
2040 .into(),
2041 HashMap::new(),
2042 )
2043 .unwrap();
2044
2045 let output = schema.tree_string();
2046 insta::assert_snapshot!(output, @r"
2047 root
2048 |-- id: int32 (nullable = false)
2049 |-- address: struct (nullable = true)
2050 | |-- street: utf8 (nullable = true)
2051 | |-- city: utf8 (nullable = true)
2052 |-- tags: list (nullable = true)
2053 | |-- item: utf8 (nullable = true)
2054 |-- score: decimal128(10, 2) (nullable = true)
2055 ");
2056 }
2057
2058 #[test]
2059 fn test_print_schema_empty() {
2060 let schema = DFSchema::empty();
2061 let output = schema.tree_string();
2062 insta::assert_snapshot!(output, @r###"root"###);
2063 }
2064
2065 #[test]
2066 fn test_print_schema_deeply_nested_types() {
2067 let inner_struct = Field::new(
2069 "inner",
2070 DataType::Struct(Fields::from(vec![
2071 Field::new("level1", DataType::Utf8, true),
2072 Field::new("level2", DataType::Int32, false),
2073 ])),
2074 true,
2075 );
2076
2077 let nested_list = Field::new(
2078 "nested_list",
2079 DataType::List(Arc::new(Field::new(
2080 "item",
2081 DataType::Struct(Fields::from(vec![
2082 Field::new("id", DataType::Int64, false),
2083 Field::new("value", DataType::Float64, true),
2084 ])),
2085 true,
2086 ))),
2087 true,
2088 );
2089
2090 let map_field = Field::new(
2091 "map_data",
2092 DataType::Map(
2093 Arc::new(Field::new(
2094 "entries",
2095 DataType::Struct(Fields::from(vec![
2096 Field::new("key", DataType::Utf8, false),
2097 Field::new(
2098 "value",
2099 DataType::List(Arc::new(Field::new(
2100 "item",
2101 DataType::Int32,
2102 true,
2103 ))),
2104 true,
2105 ),
2106 ])),
2107 false,
2108 )),
2109 false,
2110 ),
2111 true,
2112 );
2113
2114 let schema = DFSchema::from_unqualified_fields(
2115 vec![
2116 Field::new("simple_field", DataType::Utf8, true),
2117 inner_struct,
2118 nested_list,
2119 map_field,
2120 Field::new(
2121 "timestamp_field",
2122 DataType::Timestamp(
2123 arrow::datatypes::TimeUnit::Microsecond,
2124 Some("UTC".into()),
2125 ),
2126 false,
2127 ),
2128 ]
2129 .into(),
2130 HashMap::new(),
2131 )
2132 .unwrap();
2133
2134 let output = schema.tree_string();
2135
2136 insta::assert_snapshot!(output, @r"
2137 root
2138 |-- simple_field: utf8 (nullable = true)
2139 |-- inner: struct (nullable = true)
2140 | |-- level1: utf8 (nullable = true)
2141 | |-- level2: int32 (nullable = false)
2142 |-- nested_list: list (nullable = true)
2143 | |-- item: struct (nullable = true)
2144 | | |-- id: int64 (nullable = false)
2145 | | |-- value: float64 (nullable = true)
2146 |-- map_data: map (nullable = true)
2147 | |-- key: utf8 (nullable = false)
2148 | |-- value: list (nullable = true)
2149 | | |-- item: int32 (nullable = true)
2150 |-- timestamp_field: timestamp (UTC) (nullable = false)
2151 ");
2152 }
2153
2154 #[test]
2155 fn test_print_schema_mixed_qualified_unqualified() {
2156 let schema = DFSchema::new_with_metadata(
2158 vec![
2159 (
2160 Some("table1".into()),
2161 Arc::new(Field::new("id", DataType::Int32, false)),
2162 ),
2163 (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2164 (
2165 Some("table2".into()),
2166 Arc::new(Field::new("score", DataType::Float64, true)),
2167 ),
2168 (
2169 None,
2170 Arc::new(Field::new("active", DataType::Boolean, false)),
2171 ),
2172 ],
2173 HashMap::new(),
2174 )
2175 .unwrap();
2176
2177 let output = schema.tree_string();
2178
2179 insta::assert_snapshot!(output, @r"
2180 root
2181 |-- table1.id: int32 (nullable = false)
2182 |-- name: utf8 (nullable = true)
2183 |-- table2.score: float64 (nullable = true)
2184 |-- active: boolean (nullable = false)
2185 ");
2186 }
2187
2188 #[test]
2189 fn test_print_schema_array_of_map() {
2190 let map_field = Field::new(
2192 "entries",
2193 DataType::Struct(Fields::from(vec![
2194 Field::new("key", DataType::Utf8, false),
2195 Field::new("value", DataType::Utf8, false),
2196 ])),
2197 false,
2198 );
2199
2200 let array_of_map_field = Field::new(
2201 "array_map_field",
2202 DataType::List(Arc::new(Field::new(
2203 "item",
2204 DataType::Map(Arc::new(map_field), false),
2205 false,
2206 ))),
2207 false,
2208 );
2209
2210 let schema = DFSchema::from_unqualified_fields(
2211 vec![array_of_map_field].into(),
2212 HashMap::new(),
2213 )
2214 .unwrap();
2215
2216 let output = schema.tree_string();
2217
2218 insta::assert_snapshot!(output, @r"
2219 root
2220 |-- array_map_field: list (nullable = false)
2221 | |-- item: map (nullable = false)
2222 | | |-- key: utf8 (nullable = false)
2223 | | |-- value: utf8 (nullable = false)
2224 ");
2225 }
2226
2227 #[test]
2228 fn test_print_schema_complex_type_combinations() {
2229 let list_of_structs = Field::new(
2233 "list_of_structs",
2234 DataType::List(Arc::new(Field::new(
2235 "item",
2236 DataType::Struct(Fields::from(vec![
2237 Field::new("id", DataType::Int32, false),
2238 Field::new("name", DataType::Utf8, true),
2239 Field::new("score", DataType::Float64, true),
2240 ])),
2241 true,
2242 ))),
2243 true,
2244 );
2245
2246 let struct_with_lists = Field::new(
2248 "struct_with_lists",
2249 DataType::Struct(Fields::from(vec![
2250 Field::new(
2251 "tags",
2252 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2253 true,
2254 ),
2255 Field::new(
2256 "scores",
2257 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2258 false,
2259 ),
2260 Field::new("metadata", DataType::Utf8, true),
2261 ])),
2262 false,
2263 );
2264
2265 let map_with_struct_values = Field::new(
2267 "map_with_struct_values",
2268 DataType::Map(
2269 Arc::new(Field::new(
2270 "entries",
2271 DataType::Struct(Fields::from(vec![
2272 Field::new("key", DataType::Utf8, false),
2273 Field::new(
2274 "value",
2275 DataType::Struct(Fields::from(vec![
2276 Field::new("count", DataType::Int64, false),
2277 Field::new("active", DataType::Boolean, true),
2278 ])),
2279 true,
2280 ),
2281 ])),
2282 false,
2283 )),
2284 false,
2285 ),
2286 true,
2287 );
2288
2289 let list_of_maps = Field::new(
2291 "list_of_maps",
2292 DataType::List(Arc::new(Field::new(
2293 "item",
2294 DataType::Map(
2295 Arc::new(Field::new(
2296 "entries",
2297 DataType::Struct(Fields::from(vec![
2298 Field::new("key", DataType::Utf8, false),
2299 Field::new("value", DataType::Int32, true),
2300 ])),
2301 false,
2302 )),
2303 false,
2304 ),
2305 true,
2306 ))),
2307 true,
2308 );
2309
2310 let deeply_nested = Field::new(
2312 "deeply_nested",
2313 DataType::Struct(Fields::from(vec![
2314 Field::new("level1", DataType::Utf8, true),
2315 Field::new(
2316 "level2",
2317 DataType::List(Arc::new(Field::new(
2318 "item",
2319 DataType::Struct(Fields::from(vec![
2320 Field::new("id", DataType::Int32, false),
2321 Field::new(
2322 "properties",
2323 DataType::Map(
2324 Arc::new(Field::new(
2325 "entries",
2326 DataType::Struct(Fields::from(vec![
2327 Field::new("key", DataType::Utf8, false),
2328 Field::new("value", DataType::Float64, true),
2329 ])),
2330 false,
2331 )),
2332 false,
2333 ),
2334 true,
2335 ),
2336 ])),
2337 true,
2338 ))),
2339 false,
2340 ),
2341 ])),
2342 true,
2343 );
2344
2345 let schema = DFSchema::from_unqualified_fields(
2346 vec![
2347 list_of_structs,
2348 struct_with_lists,
2349 map_with_struct_values,
2350 list_of_maps,
2351 deeply_nested,
2352 ]
2353 .into(),
2354 HashMap::new(),
2355 )
2356 .unwrap();
2357
2358 let output = schema.tree_string();
2359
2360 insta::assert_snapshot!(output, @r"
2361 root
2362 |-- list_of_structs: list (nullable = true)
2363 | |-- item: struct (nullable = true)
2364 | | |-- id: int32 (nullable = false)
2365 | | |-- name: utf8 (nullable = true)
2366 | | |-- score: float64 (nullable = true)
2367 |-- struct_with_lists: struct (nullable = false)
2368 | |-- tags: list (nullable = true)
2369 | | |-- item: utf8 (nullable = true)
2370 | |-- scores: list (nullable = false)
2371 | | |-- item: int32 (nullable = true)
2372 | |-- metadata: utf8 (nullable = true)
2373 |-- map_with_struct_values: map (nullable = true)
2374 | |-- key: utf8 (nullable = false)
2375 | |-- value: struct (nullable = true)
2376 | | |-- count: int64 (nullable = false)
2377 | | |-- active: boolean (nullable = true)
2378 |-- list_of_maps: list (nullable = true)
2379 | |-- item: map (nullable = true)
2380 | | |-- key: utf8 (nullable = false)
2381 | | |-- value: int32 (nullable = false)
2382 |-- deeply_nested: struct (nullable = true)
2383 | |-- level1: utf8 (nullable = true)
2384 | |-- level2: list (nullable = false)
2385 | | |-- item: struct (nullable = true)
2386 | | | |-- id: int32 (nullable = false)
2387 | | | |-- properties: map (nullable = true)
2388 | | | | |-- key: utf8 (nullable = false)
2389 | | | | |-- value: float64 (nullable = false)
2390 ");
2391 }
2392
2393 #[test]
2394 fn test_print_schema_edge_case_types() {
2395 let schema = DFSchema::from_unqualified_fields(
2397 vec![
2398 Field::new("null_field", DataType::Null, true),
2399 Field::new("binary_field", DataType::Binary, false),
2400 Field::new("large_binary", DataType::LargeBinary, true),
2401 Field::new("large_utf8", DataType::LargeUtf8, false),
2402 Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2403 Field::new(
2404 "fixed_size_list",
2405 DataType::FixedSizeList(
2406 Arc::new(Field::new("item", DataType::Int32, true)),
2407 5,
2408 ),
2409 false,
2410 ),
2411 Field::new("decimal32", DataType::Decimal32(9, 4), true),
2412 Field::new("decimal64", DataType::Decimal64(9, 4), true),
2413 Field::new("decimal128", DataType::Decimal128(18, 4), true),
2414 Field::new("decimal256", DataType::Decimal256(38, 10), false),
2415 Field::new("date32", DataType::Date32, true),
2416 Field::new("date64", DataType::Date64, false),
2417 Field::new(
2418 "time32_seconds",
2419 DataType::Time32(arrow::datatypes::TimeUnit::Second),
2420 true,
2421 ),
2422 Field::new(
2423 "time64_nanoseconds",
2424 DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2425 false,
2426 ),
2427 ]
2428 .into(),
2429 HashMap::new(),
2430 )
2431 .unwrap();
2432
2433 let output = schema.tree_string();
2434
2435 insta::assert_snapshot!(output, @r"
2436 root
2437 |-- null_field: null (nullable = true)
2438 |-- binary_field: binary (nullable = false)
2439 |-- large_binary: large_binary (nullable = true)
2440 |-- large_utf8: large_utf8 (nullable = false)
2441 |-- fixed_size_binary: fixed_size_binary (nullable = true)
2442 |-- fixed_size_list: fixed size list (nullable = false)
2443 | |-- item: int32 (nullable = true)
2444 |-- decimal32: decimal32(9, 4) (nullable = true)
2445 |-- decimal64: decimal64(9, 4) (nullable = true)
2446 |-- decimal128: decimal128(18, 4) (nullable = true)
2447 |-- decimal256: decimal256(38, 10) (nullable = false)
2448 |-- date32: date32 (nullable = true)
2449 |-- date64: date64 (nullable = false)
2450 |-- time32_seconds: time32 (nullable = true)
2451 |-- time64_nanoseconds: time64 (nullable = false)
2452 ");
2453 }
2454}