use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
use crate::{
field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
SchemaError, TableReference,
};
use arrow::compute::can_cast_types;
use arrow::datatypes::{
DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
};
pub type DFSchemaRef = Arc<DFSchema>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFSchema {
inner: SchemaRef,
field_qualifiers: Vec<Option<TableReference>>,
functional_dependencies: FunctionalDependencies,
}
impl DFSchema {
pub fn empty() -> Self {
Self {
inner: Arc::new(Schema::new([])),
field_qualifiers: vec![],
functional_dependencies: FunctionalDependencies::empty(),
}
}
pub fn as_arrow(&self) -> &Schema {
self.inner.as_ref()
}
pub fn inner(&self) -> &SchemaRef {
&self.inner
}
pub fn new_with_metadata(
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
metadata: HashMap<String, String>,
) -> Result<Self> {
let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
qualified_fields.into_iter().unzip();
let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
let dfschema = Self {
inner: schema,
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}
pub fn from_unqualified_fields(
fields: Fields,
metadata: HashMap<String, String>,
) -> Result<Self> {
let field_count = fields.len();
let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
let dfschema = Self {
inner: schema,
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}
pub fn try_from_qualified_schema(
qualifier: impl Into<TableReference>,
schema: &Schema,
) -> Result<Self> {
let qualifier = qualifier.into();
let schema = DFSchema {
inner: schema.clone().into(),
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
schema.check_names()?;
Ok(schema)
}
pub fn from_field_specific_qualified_schema(
qualifiers: Vec<Option<TableReference>>,
schema: &SchemaRef,
) -> Result<Self> {
let dfschema = Self {
inner: Arc::clone(schema),
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}
pub fn with_field_specific_qualified_schema(
&self,
qualifiers: Vec<Option<TableReference>>,
) -> Result<Self> {
if qualifiers.len() != self.fields().len() {
return _plan_err!(
"Number of qualifiers must match number of fields. Expected {}, got {}",
self.fields().len(),
qualifiers.len()
);
}
Ok(DFSchema {
inner: Arc::clone(&self.inner),
field_qualifiers: qualifiers,
functional_dependencies: self.functional_dependencies.clone(),
})
}
pub fn check_names(&self) -> Result<()> {
let mut qualified_names = BTreeSet::new();
let mut unqualified_names = BTreeSet::new();
for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
if let Some(qualifier) = qualifier {
if !qualified_names.insert((qualifier, field.name())) {
return _schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
});
}
} else if !unqualified_names.insert(field.name()) {
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string()
});
}
}
for (qualifier, name) in qualified_names {
if unqualified_names.contains(name) {
return _schema_err!(SchemaError::AmbiguousReference {
field: Box::new(Column::new(Some(qualifier.clone()), name))
});
}
}
Ok(())
}
pub fn with_functional_dependencies(
mut self,
functional_dependencies: FunctionalDependencies,
) -> Result<Self> {
if functional_dependencies.is_valid(self.inner.fields.len()) {
self.functional_dependencies = functional_dependencies;
Ok(self)
} else {
_plan_err!(
"Invalid functional dependency: {:?}",
functional_dependencies
)
}
}
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
let mut schema_builder = SchemaBuilder::new();
schema_builder.extend(self.inner.fields().iter().cloned());
schema_builder.extend(schema.fields().iter().cloned());
let new_schema = schema_builder.finish();
let mut new_metadata = self.inner.metadata.clone();
new_metadata.extend(schema.inner.metadata.clone());
let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
let mut new_qualifiers = self.field_qualifiers.clone();
new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
let new_self = Self {
inner: Arc::new(new_schema_with_metadata),
field_qualifiers: new_qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
new_self.check_names()?;
Ok(new_self)
}
pub fn merge(&mut self, other_schema: &DFSchema) {
if other_schema.inner.fields.is_empty() {
return;
}
let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
self.iter().collect();
let self_unqualified_names: HashSet<&str> = self
.inner
.fields
.iter()
.map(|field| field.name().as_str())
.collect();
let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
let mut qualifiers = Vec::new();
for (qualifier, field) in other_schema.iter() {
let duplicated_field = match qualifier {
Some(q) => self_fields.contains(&(Some(q), field)),
None => self_unqualified_names.contains(field.name().as_str()),
};
if !duplicated_field {
schema_builder.push(Arc::clone(field));
qualifiers.push(qualifier.cloned());
}
}
let mut metadata = self.inner.metadata.clone();
metadata.extend(other_schema.inner.metadata.clone());
let finished = schema_builder.finish();
let finished_with_metadata = finished.with_metadata(metadata);
self.inner = finished_with_metadata.into();
self.field_qualifiers.extend(qualifiers);
}
pub fn fields(&self) -> &Fields {
&self.inner.fields
}
pub fn field(&self, i: usize) -> &Field {
&self.inner.fields[i]
}
pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
(self.field_qualifiers[i].as_ref(), self.field(i))
}
pub fn index_of_column_by_name(
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Option<usize> {
let mut matches = self
.iter()
.enumerate()
.filter(|(_, (q, f))| match (qualifier, q) {
(Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
(Some(_), None) => false,
(None, Some(_)) | (None, None) => f.name() == name,
})
.map(|(idx, _)| idx);
matches.next()
}
pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
}
pub fn index_of_column(&self, col: &Column) -> Result<usize> {
self.maybe_index_of_column(col)
.ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
}
pub fn is_column_from_schema(&self, col: &Column) -> bool {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
.is_some()
}
pub fn field_with_name(
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Result<&Field> {
if let Some(qualifier) = qualifier {
self.field_with_qualified_name(qualifier, name)
} else {
self.field_with_unqualified_name(name)
}
}
pub fn qualified_field_with_name(
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Result<(Option<&TableReference>, &Field)> {
if let Some(qualifier) = qualifier {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
} else {
self.qualified_field_with_unqualified_name(name)
}
}
pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
self.iter()
.filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
.map(|(_, f)| f.as_ref())
.collect()
}
pub fn fields_indices_with_qualified(
&self,
qualifier: &TableReference,
) -> Vec<usize> {
self.iter()
.enumerate()
.filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
.collect()
}
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
self.fields()
.iter()
.filter(|field| field.name() == name)
.map(|f| f.as_ref())
.collect()
}
pub fn qualified_fields_with_unqualified_name(
&self,
name: &str,
) -> Vec<(Option<&TableReference>, &Field)> {
self.iter()
.filter(|(_, field)| field.name() == name)
.map(|(qualifier, field)| (qualifier, field.as_ref()))
.collect()
}
pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
self.iter()
.filter(|(_, field)| field.name() == name)
.map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
.collect()
}
pub fn columns(&self) -> Vec<Column> {
self.iter()
.map(|(qualifier, field)| {
Column::new(qualifier.cloned(), field.name().clone())
})
.collect()
}
pub fn qualified_field_with_unqualified_name(
&self,
name: &str,
) -> Result<(Option<&TableReference>, &Field)> {
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
1 => Ok((matches[0].0, matches[0].1)),
_ => {
let fields_without_qualifier = matches
.iter()
.filter(|(q, _)| q.is_none())
.collect::<Vec<_>>();
if fields_without_qualifier.len() == 1 {
Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
} else {
_schema_err!(SchemaError::AmbiguousReference {
field: Box::new(Column::new_unqualified(name.to_string()))
})
}
}
}
}
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
self.qualified_field_with_unqualified_name(name)
.map(|(_, field)| field)
}
pub fn field_with_qualified_name(
&self,
qualifier: &TableReference,
name: &str,
) -> Result<&Field> {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
Ok(self.field(idx))
}
pub fn qualified_field_from_column(
&self,
column: &Column,
) -> Result<(Option<&TableReference>, &Field)> {
self.qualified_field_with_name(column.relation.as_ref(), &column.name)
}
pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
self.fields().iter().any(|field| field.name() == name)
}
pub fn has_column_with_qualified_name(
&self,
qualifier: &TableReference,
name: &str,
) -> bool {
self.iter()
.any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
}
pub fn has_column(&self, column: &Column) -> bool {
match &column.relation {
Some(r) => self.has_column_with_qualified_name(r, &column.name),
None => self.has_column_with_unqualified_name(&column.name),
}
}
pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
self.inner
.fields
.iter()
.zip(arrow_schema.fields().iter())
.all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
}
#[deprecated(since = "47.0.0", note = "This method is no longer used")]
pub fn check_arrow_schema_type_compatible(
&self,
arrow_schema: &Schema,
) -> Result<()> {
let self_arrow_schema = self.as_arrow();
self_arrow_schema
.fields()
.iter()
.zip(arrow_schema.fields().iter())
.try_for_each(|(l_field, r_field)| {
if !can_cast_types(r_field.data_type(), l_field.data_type()) {
_plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
r_field.name(),
r_field.data_type(),
l_field.name(),
l_field.data_type())
} else {
Ok(())
}
})
}
pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
}
let self_fields = self.iter();
let other_fields = other.iter();
self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
q1 == q2
&& f1.name() == f2.name()
&& Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
})
}
#[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
self.has_equivalent_names_and_types(other).is_ok()
}
pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
if self.fields().len() != other.fields().len() {
_plan_err!(
"Schema mismatch: the schema length are not same \
Expected schema length: {}, got: {}",
self.fields().len(),
other.fields().len()
)
} else {
self.fields()
.iter()
.zip(other.fields().iter())
.try_for_each(|(f1, f2)| {
if f1.name() != f2.name()
|| (!DFSchema::datatype_is_semantically_equal(
f1.data_type(),
f2.data_type(),
))
{
_plan_err!(
"Schema mismatch: Expected field '{}' with type {}, \
but got '{}' with type {}.",
f1.name(),
f1.data_type(),
f2.name(),
f2.data_type()
)
} else {
Ok(())
}
})
}
}
pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
match (dt1, dt2) {
(DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
v1.as_ref() == v2.as_ref()
}
(DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
(othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
(DataType::List(f1), DataType::List(f2))
| (DataType::LargeList(f1), DataType::LargeList(f2))
| (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
}
(DataType::Map(f1, _), DataType::Map(f2, _)) => {
match (f1.data_type(), f2.data_type()) {
(DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
f1_inner.len() == f2_inner.len()
&& f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
Self::datatype_is_logically_equal(
f1.data_type(),
f2.data_type(),
)
})
}
_ => panic!("Map type should have an inner struct field"),
}
}
(DataType::Struct(fields1), DataType::Struct(fields2)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
iter1
.zip(iter2)
.all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
iter1
.zip(iter2)
.all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
}
(DataType::Utf8, DataType::Utf8View) => true,
(DataType::Utf8View, DataType::Utf8) => true,
_ => Self::datatype_is_semantically_equal(dt1, dt2),
}
}
pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
match (dt1, dt2) {
(DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
&& Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
}
(DataType::List(f1), DataType::List(f2))
| (DataType::LargeList(f1), DataType::LargeList(f2))
| (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
}
(DataType::Map(f1, _), DataType::Map(f2, _)) => {
match (f1.data_type(), f2.data_type()) {
(DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
f1_inner.len() == f2_inner.len()
&& f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
Self::datatype_is_semantically_equal(
f1.data_type(),
f2.data_type(),
)
})
}
_ => panic!("Map type should have an inner struct field"),
}
}
(DataType::Struct(fields1), DataType::Struct(fields2)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
iter1
.zip(iter2)
.all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
iter1
.zip(iter2)
.all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
}
(
DataType::Decimal32(_l_precision, _l_scale),
DataType::Decimal32(_r_precision, _r_scale),
) => true,
(
DataType::Decimal64(_l_precision, _l_scale),
DataType::Decimal64(_r_precision, _r_scale),
) => true,
(
DataType::Decimal128(_l_precision, _l_scale),
DataType::Decimal128(_r_precision, _r_scale),
) => true,
(
DataType::Decimal256(_l_precision, _l_scale),
DataType::Decimal256(_r_precision, _r_scale),
) => true,
(
DataType::Timestamp(_l_time_unit, _l_timezone),
DataType::Timestamp(_r_time_unit, _r_timezone),
) => true,
_ => dt1 == dt2,
}
}
fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
f1.name() == f2.name()
&& Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
}
fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
f1.name() == f2.name()
&& Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
}
pub fn strip_qualifiers(self) -> Self {
DFSchema {
field_qualifiers: vec![None; self.inner.fields.len()],
inner: self.inner,
functional_dependencies: self.functional_dependencies,
}
}
pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
let qualifier = qualifier.into();
DFSchema {
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
inner: self.inner,
functional_dependencies: self.functional_dependencies,
}
}
pub fn field_names(&self) -> Vec<String> {
self.iter()
.map(|(qualifier, field)| qualified_name(qualifier, field.name()))
.collect::<Vec<_>>()
}
pub fn metadata(&self) -> &HashMap<String, String> {
&self.inner.metadata
}
pub fn functional_dependencies(&self) -> &FunctionalDependencies {
&self.functional_dependencies
}
pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
self.field_qualifiers
.iter()
.zip(self.inner.fields().iter())
.map(|(qualifier, field)| (qualifier.as_ref(), field))
}
pub fn tree_string(&self) -> impl Display + '_ {
let mut result = String::from("root\n");
for (qualifier, field) in self.iter() {
let field_name = match qualifier {
Some(q) => format!("{}.{}", q, field.name()),
None => field.name().to_string(),
};
format_field_with_indent(
&mut result,
&field_name,
field.data_type(),
field.is_nullable(),
" ",
);
}
if result.ends_with('\n') {
result.pop();
}
result
}
}
fn format_field_with_indent(
result: &mut String,
field_name: &str,
data_type: &DataType,
nullable: bool,
indent: &str,
) {
let nullable_str = nullable.to_string().to_lowercase();
let child_indent = format!("{indent}| ");
match data_type {
DataType::List(field) => {
result.push_str(&format!(
"{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
));
format_field_with_indent(
result,
field.name(),
field.data_type(),
field.is_nullable(),
&child_indent,
);
}
DataType::LargeList(field) => {
result.push_str(&format!(
"{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
));
format_field_with_indent(
result,
field.name(),
field.data_type(),
field.is_nullable(),
&child_indent,
);
}
DataType::FixedSizeList(field, _size) => {
result.push_str(&format!(
"{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
));
format_field_with_indent(
result,
field.name(),
field.data_type(),
field.is_nullable(),
&child_indent,
);
}
DataType::Map(field, _) => {
result.push_str(&format!(
"{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
));
if let DataType::Struct(inner_fields) = field.data_type() {
if inner_fields.len() == 2 {
format_field_with_indent(
result,
"key",
inner_fields[0].data_type(),
inner_fields[0].is_nullable(),
&child_indent,
);
let value_contains_null =
field.is_nullable().to_string().to_lowercase();
match inner_fields[1].data_type() {
DataType::Struct(_)
| DataType::List(_)
| DataType::LargeList(_)
| DataType::FixedSizeList(_, _)
| DataType::Map(_, _) => {
format_field_with_indent(
result,
"value",
inner_fields[1].data_type(),
inner_fields[1].is_nullable(),
&child_indent,
);
}
_ => {
result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
format_simple_data_type(inner_fields[1].data_type())));
}
}
}
}
}
DataType::Struct(fields) => {
result.push_str(&format!(
"{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
));
for struct_field in fields {
format_field_with_indent(
result,
struct_field.name(),
struct_field.data_type(),
struct_field.is_nullable(),
&child_indent,
);
}
}
_ => {
let type_str = format_simple_data_type(data_type);
result.push_str(&format!(
"{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
));
}
}
}
fn format_simple_data_type(data_type: &DataType) -> String {
match data_type {
DataType::Boolean => "boolean".to_string(),
DataType::Int8 => "int8".to_string(),
DataType::Int16 => "int16".to_string(),
DataType::Int32 => "int32".to_string(),
DataType::Int64 => "int64".to_string(),
DataType::UInt8 => "uint8".to_string(),
DataType::UInt16 => "uint16".to_string(),
DataType::UInt32 => "uint32".to_string(),
DataType::UInt64 => "uint64".to_string(),
DataType::Float16 => "float16".to_string(),
DataType::Float32 => "float32".to_string(),
DataType::Float64 => "float64".to_string(),
DataType::Utf8 => "utf8".to_string(),
DataType::LargeUtf8 => "large_utf8".to_string(),
DataType::Binary => "binary".to_string(),
DataType::LargeBinary => "large_binary".to_string(),
DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
DataType::Date32 => "date32".to_string(),
DataType::Date64 => "date64".to_string(),
DataType::Time32(_) => "time32".to_string(),
DataType::Time64(_) => "time64".to_string(),
DataType::Timestamp(_, tz) => match tz {
Some(tz_str) => format!("timestamp ({tz_str})"),
None => "timestamp".to_string(),
},
DataType::Interval(_) => "interval".to_string(),
DataType::Dictionary(_, value_type) => {
format_simple_data_type(value_type.as_ref())
}
DataType::Decimal32(precision, scale) => {
format!("decimal32({precision}, {scale})")
}
DataType::Decimal64(precision, scale) => {
format!("decimal64({precision}, {scale})")
}
DataType::Decimal128(precision, scale) => {
format!("decimal128({precision}, {scale})")
}
DataType::Decimal256(precision, scale) => {
format!("decimal256({precision}, {scale})")
}
DataType::Null => "null".to_string(),
_ => format!("{data_type}").to_lowercase(),
}
}
impl AsRef<Schema> for DFSchema {
fn as_ref(&self) -> &Schema {
self.as_arrow()
}
}
impl AsRef<SchemaRef> for DFSchema {
fn as_ref(&self) -> &SchemaRef {
self.inner()
}
}
impl TryFrom<Schema> for DFSchema {
type Error = DataFusionError;
fn try_from(schema: Schema) -> Result<Self, Self::Error> {
Self::try_from(Arc::new(schema))
}
}
impl TryFrom<SchemaRef> for DFSchema {
type Error = DataFusionError;
fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
let field_count = schema.fields.len();
let dfschema = Self {
inner: schema,
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
Ok(dfschema)
}
}
impl Hash for DFSchema {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.fields.hash(state);
self.inner.metadata.len().hash(state); }
}
pub trait ToDFSchema
where
Self: Sized,
{
fn to_dfschema(self) -> Result<DFSchema>;
fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
Ok(Arc::new(self.to_dfschema()?))
}
}
impl ToDFSchema for Schema {
fn to_dfschema(self) -> Result<DFSchema> {
DFSchema::try_from(self)
}
}
impl ToDFSchema for SchemaRef {
fn to_dfschema(self) -> Result<DFSchema> {
DFSchema::try_from(self)
}
}
impl ToDFSchema for Vec<Field> {
fn to_dfschema(self) -> Result<DFSchema> {
let field_count = self.len();
let schema = Schema {
fields: self.into(),
metadata: HashMap::new(),
};
let dfschema = DFSchema {
inner: schema.into(),
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
Ok(dfschema)
}
}
impl Display for DFSchema {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"fields:[{}], metadata:{:?}",
self.iter()
.map(|(q, f)| qualified_name(q, f.name()))
.collect::<Vec<String>>()
.join(", "),
self.inner.metadata
)
}
}
pub trait ExprSchema: std::fmt::Debug {
fn nullable(&self, col: &Column) -> Result<bool> {
Ok(self.field_from_column(col)?.is_nullable())
}
fn data_type(&self, col: &Column) -> Result<&DataType> {
Ok(self.field_from_column(col)?.data_type())
}
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
Ok(self.field_from_column(col)?.metadata())
}
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
let field = self.field_from_column(col)?;
Ok((field.data_type(), field.is_nullable()))
}
fn field_from_column(&self, col: &Column) -> Result<&Field>;
}
impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
fn nullable(&self, col: &Column) -> Result<bool> {
self.as_ref().nullable(col)
}
fn data_type(&self, col: &Column) -> Result<&DataType> {
self.as_ref().data_type(col)
}
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
ExprSchema::metadata(self.as_ref(), col)
}
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
self.as_ref().data_type_and_nullable(col)
}
fn field_from_column(&self, col: &Column) -> Result<&Field> {
self.as_ref().field_from_column(col)
}
}
impl ExprSchema for DFSchema {
fn field_from_column(&self, col: &Column) -> Result<&Field> {
match &col.relation {
Some(r) => self.field_with_qualified_name(r, &col.name),
None => self.field_with_unqualified_name(&col.name),
}
}
}
pub trait SchemaExt {
fn equivalent_names_and_types(&self, other: &Self) -> bool;
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
}
impl SchemaExt for Schema {
fn equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
}
self.fields()
.iter()
.zip(other.fields().iter())
.all(|(f1, f2)| {
f1.name() == f2.name()
&& DFSchema::datatype_is_semantically_equal(
f1.data_type(),
f2.data_type(),
)
})
}
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
if self.fields().len() != other.fields().len() {
_plan_err!(
"Inserting query must have the same schema length as the table. \
Expected table schema length: {}, got: {}",
self.fields().len(),
other.fields().len()
)
} else {
self.fields()
.iter()
.zip(other.fields().iter())
.try_for_each(|(f1, f2)| {
if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
_plan_err!(
"Inserting query schema mismatch: Expected table field '{}' with type {}, \
but got '{}' with type {}.",
f1.name(),
f1.data_type(),
f2.name(),
f2.data_type())
} else {
Ok(())
}
})
}
}
}
pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
match qualifier {
Some(q) => format!("{q}.{name}"),
None => name.to_string(),
}
}
#[cfg(test)]
mod tests {
use crate::assert_contains;
use super::*;
#[test]
fn qualifier_in_name() -> Result<()> {
let col = Column::from_name("t1.c0");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let err = schema.index_of_column(&col).unwrap_err();
let expected = "Schema error: No field named \"t1.c0\". \
Column names are case sensitive. \
You can use double quotes to refer to the \"\"t1.c0\"\" column \
or set the datafusion.sql_parser.enable_ident_normalization configuration. \
Did you mean 't1.c0'?.";
assert_eq!(err.strip_backtrace(), expected);
Ok(())
}
#[test]
fn quoted_qualifiers_in_name() -> Result<()> {
let col = Column::from_name("t1.c0");
let schema = DFSchema::try_from_qualified_schema(
"t1",
&Schema::new(vec![
Field::new("CapitalColumn", DataType::Boolean, true),
Field::new("field.with.period", DataType::Boolean, true),
]),
)?;
let err = schema.index_of_column(&col).unwrap_err();
let expected = "Schema error: No field named \"t1.c0\". \
Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
assert_eq!(err.strip_backtrace(), expected);
Ok(())
}
#[test]
fn from_unqualified_schema() -> Result<()> {
let schema = DFSchema::try_from(test_schema_1())?;
assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
Ok(())
}
#[test]
fn from_qualified_schema() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
Ok(())
}
#[test]
fn test_from_field_specific_qualified_schema() -> Result<()> {
let schema = DFSchema::from_field_specific_qualified_schema(
vec![Some("t1".into()), None],
&Arc::new(Schema::new(vec![
Field::new("c0", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
])),
)?;
assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
Ok(())
}
#[test]
fn test_from_qualified_fields() -> Result<()> {
let schema = DFSchema::new_with_metadata(
vec![
(
Some("t0".into()),
Arc::new(Field::new("c0", DataType::Boolean, true)),
),
(None, Arc::new(Field::new("c1", DataType::Boolean, true))),
],
HashMap::new(),
)?;
assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
Ok(())
}
#[test]
fn from_qualified_schema_into_arrow_schema() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let arrow_schema = schema.as_arrow();
insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
Ok(())
}
#[test]
fn join_qualified() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
let join = left.join(&right)?;
assert_eq!(
"fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
join.to_string()
);
assert!(join
.field_with_qualified_name(&TableReference::bare("t1"), "c0")
.is_ok());
assert!(join
.field_with_qualified_name(&TableReference::bare("t2"), "c0")
.is_ok());
assert!(join.field_with_unqualified_name("c0").is_err());
assert!(join.field_with_unqualified_name("t1.c0").is_err());
assert!(join.field_with_unqualified_name("t2.c0").is_err());
Ok(())
}
#[test]
fn join_qualified_duplicate() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let join = left.join(&right);
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate qualified field name t1.c0",
);
Ok(())
}
#[test]
fn join_unqualified_duplicate() -> Result<()> {
let left = DFSchema::try_from(test_schema_1())?;
let right = DFSchema::try_from(test_schema_1())?;
let join = left.join(&right);
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate unqualified field name c0"
);
Ok(())
}
#[test]
fn join_mixed() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from(test_schema_2())?;
let join = left.join(&right)?;
assert_eq!(
"fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
join.to_string()
);
assert!(join
.field_with_qualified_name(&TableReference::bare("t1"), "c0")
.is_ok());
assert!(join.field_with_unqualified_name("c0").is_ok());
assert!(join.field_with_unqualified_name("c100").is_ok());
assert!(join.field_with_name(None, "c100").is_ok());
assert!(join.field_with_unqualified_name("t1.c0").is_err());
assert!(join.field_with_unqualified_name("t1.c100").is_err());
assert!(join
.field_with_qualified_name(&TableReference::bare(""), "c100")
.is_err());
Ok(())
}
#[test]
fn join_mixed_duplicate() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from(test_schema_1())?;
let join = left.join(&right);
assert_contains!(join.unwrap_err().to_string(),
"Schema error: Schema contains qualified \
field name t1.c0 and unqualified field name c0 which would be ambiguous");
Ok(())
}
#[test]
fn helpful_error_messages() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let expected_help = "Valid fields are t1.c0, t1.c1.";
assert_contains!(
schema
.field_with_qualified_name(&TableReference::bare("x"), "y")
.unwrap_err()
.to_string(),
expected_help
);
assert_contains!(
schema
.field_with_unqualified_name("y")
.unwrap_err()
.to_string(),
expected_help
);
assert!(schema.index_of_column_by_name(None, "y").is_none());
assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
Ok(())
}
#[test]
fn select_without_valid_fields() {
let schema = DFSchema::empty();
let col = Column::from_qualified_name("t1.c0");
let err = schema.index_of_column(&col).unwrap_err();
let expected = "Schema error: No field named t1.c0.";
assert_eq!(err.strip_backtrace(), expected);
let col = Column::from_name("c0");
let err = schema.index_of_column(&col).err().unwrap();
let expected = "Schema error: No field named c0.";
assert_eq!(err.strip_backtrace(), expected);
}
#[test]
fn into() {
let arrow_schema = Schema::new_with_metadata(
vec![Field::new("c0", DataType::Int64, true)],
test_metadata(),
);
let arrow_schema_ref = Arc::new(arrow_schema.clone());
let df_schema = DFSchema {
inner: Arc::clone(&arrow_schema_ref),
field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
let df_schema_ref = Arc::new(df_schema.clone());
{
let arrow_schema = arrow_schema.clone();
let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
}
{
let arrow_schema = arrow_schema.clone();
let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
}
assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
}
fn test_schema_1() -> Schema {
Schema::new(vec![
Field::new("c0", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
])
}
#[test]
fn test_dfschema_to_schema_conversion() {
let mut a_metadata = HashMap::new();
a_metadata.insert("key".to_string(), "value".to_string());
let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
let mut b_metadata = HashMap::new();
b_metadata.insert("key".to_string(), "value".to_string());
let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
let schema = Arc::new(Schema::new(vec![a_field, b_field]));
let df_schema = DFSchema {
inner: Arc::clone(&schema),
field_qualifiers: vec![None; schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
assert_eq!(df_schema.inner.metadata(), schema.metadata())
}
#[test]
fn test_contain_column() -> Result<()> {
{
let col = Column::from_qualified_name("t1.c0");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(schema.is_column_from_schema(&col));
}
{
let col = Column::from_qualified_name("t1.c2");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(!schema.is_column_from_schema(&col));
}
{
let col = Column::from_name("c0");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(schema.is_column_from_schema(&col));
}
{
let col = Column::from_name("c2");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(!schema.is_column_from_schema(&col));
}
Ok(())
}
#[test]
fn test_datatype_is_logically_equal() {
assert!(DFSchema::datatype_is_logically_equal(
&DataType::Int8,
&DataType::Int8
));
assert!(!DFSchema::datatype_is_logically_equal(
&DataType::Int8,
&DataType::Int16
));
assert!(DFSchema::datatype_is_logically_equal(
&DataType::List(Field::new_list_field(DataType::Int8, true).into()),
&DataType::List(Field::new("element", DataType::Int8, false).into())
));
assert!(!DFSchema::datatype_is_logically_equal(
&DataType::List(Field::new_list_field(DataType::Int8, true).into()),
&DataType::List(Field::new_list_field(DataType::Int16, true).into())
));
let map_field = DataType::Map(
Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Int8, false),
Field::new("value", DataType::Int8, true),
])),
true,
)
.into(),
true,
);
assert!(DFSchema::datatype_is_logically_equal(
&map_field,
&DataType::Map(
Field::new(
"pairs",
DataType::Struct(Fields::from(vec![
Field::new("one", DataType::Int8, false),
Field::new("two", DataType::Int8, false)
])),
true
)
.into(),
true
)
));
assert!(!DFSchema::datatype_is_logically_equal(
&map_field,
&DataType::Map(
Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Int8, false),
Field::new("value", DataType::Int16, true)
])),
true
)
.into(),
true
)
));
assert!(!DFSchema::datatype_is_logically_equal(
&map_field,
&DataType::Map(
Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Int16, false),
Field::new("value", DataType::Int8, true)
])),
true
)
.into(),
true
)
));
let struct_field = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int8, true),
Field::new("b", DataType::Int8, true),
]));
assert!(DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int8, false),
Field::new("b", DataType::Int8, true),
]))
));
assert!(!DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![
Field::new("x", DataType::Int8, true),
Field::new("y", DataType::Int8, true),
]))
));
assert!(!DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int16, true),
Field::new("b", DataType::Int8, true),
]))
));
assert!(!DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
));
}
#[test]
fn test_datatype_is_logically_equivalent_to_dictionary() {
assert!(DFSchema::datatype_is_logically_equal(
&DataType::Utf8,
&DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
));
}
#[test]
fn test_datatype_is_semantically_equal() {
assert!(DFSchema::datatype_is_semantically_equal(
&DataType::Int8,
&DataType::Int8
));
assert!(!DFSchema::datatype_is_semantically_equal(
&DataType::Int8,
&DataType::Int16
));
assert!(DFSchema::datatype_is_semantically_equal(
&DataType::Decimal32(1, 2),
&DataType::Decimal32(2, 1),
));
assert!(DFSchema::datatype_is_semantically_equal(
&DataType::Decimal64(1, 2),
&DataType::Decimal64(2, 1),
));
assert!(DFSchema::datatype_is_semantically_equal(
&DataType::Decimal128(1, 2),
&DataType::Decimal128(2, 1),
));
assert!(DFSchema::datatype_is_semantically_equal(
&DataType::Decimal256(1, 2),
&DataType::Decimal256(2, 1),
));
assert!(DFSchema::datatype_is_semantically_equal(
&DataType::Timestamp(
arrow::datatypes::TimeUnit::Microsecond,
Some("UTC".into())
),
&DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
));
assert!(DFSchema::datatype_is_semantically_equal(
&DataType::List(Field::new_list_field(DataType::Int8, true).into()),
&DataType::List(Field::new("element", DataType::Int8, false).into())
));
assert!(!DFSchema::datatype_is_semantically_equal(
&DataType::List(Field::new_list_field(DataType::Int8, true).into()),
&DataType::List(Field::new_list_field(DataType::Int16, true).into())
));
let map_field = DataType::Map(
Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Int8, false),
Field::new("value", DataType::Int8, true),
])),
true,
)
.into(),
true,
);
assert!(DFSchema::datatype_is_semantically_equal(
&map_field,
&DataType::Map(
Field::new(
"pairs",
DataType::Struct(Fields::from(vec![
Field::new("one", DataType::Int8, false),
Field::new("two", DataType::Int8, false)
])),
true
)
.into(),
true
)
));
assert!(!DFSchema::datatype_is_semantically_equal(
&map_field,
&DataType::Map(
Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Int8, false),
Field::new("value", DataType::Int16, true)
])),
true
)
.into(),
true
)
));
assert!(!DFSchema::datatype_is_semantically_equal(
&map_field,
&DataType::Map(
Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Int16, false),
Field::new("value", DataType::Int8, true)
])),
true
)
.into(),
true
)
));
let struct_field = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int8, true),
Field::new("b", DataType::Int8, true),
]));
assert!(DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int8, false),
Field::new("b", DataType::Int8, true),
]))
));
assert!(!DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![
Field::new("x", DataType::Int8, true),
Field::new("y", DataType::Int8, true),
]))
));
assert!(!DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int16, true),
Field::new("b", DataType::Int8, true),
]))
));
assert!(!DFSchema::datatype_is_logically_equal(
&struct_field,
&DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
));
}
#[test]
fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
assert!(!DFSchema::datatype_is_semantically_equal(
&DataType::Utf8,
&DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
));
}
fn test_schema_2() -> Schema {
Schema::new(vec![
Field::new("c100", DataType::Boolean, true),
Field::new("c101", DataType::Boolean, true),
])
}
fn test_metadata() -> HashMap<String, String> {
test_metadata_n(2)
}
fn test_metadata_n(n: usize) -> HashMap<String, String> {
(0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
}
#[test]
fn test_print_schema_unqualified() {
let schema = DFSchema::from_unqualified_fields(
vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
Field::new("age", DataType::Int64, true),
Field::new("active", DataType::Boolean, false),
]
.into(),
HashMap::new(),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- id: int32 (nullable = false)
|-- name: utf8 (nullable = true)
|-- age: int64 (nullable = true)
|-- active: boolean (nullable = false)
");
}
#[test]
fn test_print_schema_qualified() {
let schema = DFSchema::try_from_qualified_schema(
"table1",
&Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- table1.id: int32 (nullable = false)
|-- table1.name: utf8 (nullable = true)
");
}
#[test]
fn test_print_schema_complex_types() {
let struct_field = Field::new(
"address",
DataType::Struct(Fields::from(vec![
Field::new("street", DataType::Utf8, true),
Field::new("city", DataType::Utf8, true),
])),
true,
);
let list_field = Field::new(
"tags",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
);
let schema = DFSchema::from_unqualified_fields(
vec![
Field::new("id", DataType::Int32, false),
struct_field,
list_field,
Field::new("score", DataType::Decimal128(10, 2), true),
]
.into(),
HashMap::new(),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- id: int32 (nullable = false)
|-- address: struct (nullable = true)
| |-- street: utf8 (nullable = true)
| |-- city: utf8 (nullable = true)
|-- tags: list (nullable = true)
| |-- item: utf8 (nullable = true)
|-- score: decimal128(10, 2) (nullable = true)
");
}
#[test]
fn test_print_schema_empty() {
let schema = DFSchema::empty();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r###"root"###);
}
#[test]
fn test_print_schema_deeply_nested_types() {
let inner_struct = Field::new(
"inner",
DataType::Struct(Fields::from(vec![
Field::new("level1", DataType::Utf8, true),
Field::new("level2", DataType::Int32, false),
])),
true,
);
let nested_list = Field::new(
"nested_list",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, true),
])),
true,
))),
true,
);
let map_field = Field::new(
"map_data",
DataType::Map(
Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new(
"value",
DataType::List(Arc::new(Field::new(
"item",
DataType::Int32,
true,
))),
true,
),
])),
false,
)),
false,
),
true,
);
let schema = DFSchema::from_unqualified_fields(
vec![
Field::new("simple_field", DataType::Utf8, true),
inner_struct,
nested_list,
map_field,
Field::new(
"timestamp_field",
DataType::Timestamp(
arrow::datatypes::TimeUnit::Microsecond,
Some("UTC".into()),
),
false,
),
]
.into(),
HashMap::new(),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- simple_field: utf8 (nullable = true)
|-- inner: struct (nullable = true)
| |-- level1: utf8 (nullable = true)
| |-- level2: int32 (nullable = false)
|-- nested_list: list (nullable = true)
| |-- item: struct (nullable = true)
| | |-- id: int64 (nullable = false)
| | |-- value: float64 (nullable = true)
|-- map_data: map (nullable = true)
| |-- key: utf8 (nullable = false)
| |-- value: list (nullable = true)
| | |-- item: int32 (nullable = true)
|-- timestamp_field: timestamp (UTC) (nullable = false)
");
}
#[test]
fn test_print_schema_mixed_qualified_unqualified() {
let schema = DFSchema::new_with_metadata(
vec![
(
Some("table1".into()),
Arc::new(Field::new("id", DataType::Int32, false)),
),
(None, Arc::new(Field::new("name", DataType::Utf8, true))),
(
Some("table2".into()),
Arc::new(Field::new("score", DataType::Float64, true)),
),
(
None,
Arc::new(Field::new("active", DataType::Boolean, false)),
),
],
HashMap::new(),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- table1.id: int32 (nullable = false)
|-- name: utf8 (nullable = true)
|-- table2.score: float64 (nullable = true)
|-- active: boolean (nullable = false)
");
}
#[test]
fn test_print_schema_array_of_map() {
let map_field = Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
])),
false,
);
let array_of_map_field = Field::new(
"array_map_field",
DataType::List(Arc::new(Field::new(
"item",
DataType::Map(Arc::new(map_field), false),
false,
))),
false,
);
let schema = DFSchema::from_unqualified_fields(
vec![array_of_map_field].into(),
HashMap::new(),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- array_map_field: list (nullable = false)
| |-- item: map (nullable = false)
| | |-- key: utf8 (nullable = false)
| | |-- value: utf8 (nullable = false)
");
}
#[test]
fn test_print_schema_complex_type_combinations() {
let list_of_structs = Field::new(
"list_of_structs",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
Field::new("score", DataType::Float64, true),
])),
true,
))),
true,
);
let struct_with_lists = Field::new(
"struct_with_lists",
DataType::Struct(Fields::from(vec![
Field::new(
"tags",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
Field::new(
"scores",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false,
),
Field::new("metadata", DataType::Utf8, true),
])),
false,
);
let map_with_struct_values = Field::new(
"map_with_struct_values",
DataType::Map(
Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new(
"value",
DataType::Struct(Fields::from(vec![
Field::new("count", DataType::Int64, false),
Field::new("active", DataType::Boolean, true),
])),
true,
),
])),
false,
)),
false,
),
true,
);
let list_of_maps = Field::new(
"list_of_maps",
DataType::List(Arc::new(Field::new(
"item",
DataType::Map(
Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int32, true),
])),
false,
)),
false,
),
true,
))),
true,
);
let deeply_nested = Field::new(
"deeply_nested",
DataType::Struct(Fields::from(vec![
Field::new("level1", DataType::Utf8, true),
Field::new(
"level2",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"properties",
DataType::Map(
Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Float64, true),
])),
false,
)),
false,
),
true,
),
])),
true,
))),
false,
),
])),
true,
);
let schema = DFSchema::from_unqualified_fields(
vec![
list_of_structs,
struct_with_lists,
map_with_struct_values,
list_of_maps,
deeply_nested,
]
.into(),
HashMap::new(),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- list_of_structs: list (nullable = true)
| |-- item: struct (nullable = true)
| | |-- id: int32 (nullable = false)
| | |-- name: utf8 (nullable = true)
| | |-- score: float64 (nullable = true)
|-- struct_with_lists: struct (nullable = false)
| |-- tags: list (nullable = true)
| | |-- item: utf8 (nullable = true)
| |-- scores: list (nullable = false)
| | |-- item: int32 (nullable = true)
| |-- metadata: utf8 (nullable = true)
|-- map_with_struct_values: map (nullable = true)
| |-- key: utf8 (nullable = false)
| |-- value: struct (nullable = true)
| | |-- count: int64 (nullable = false)
| | |-- active: boolean (nullable = true)
|-- list_of_maps: list (nullable = true)
| |-- item: map (nullable = true)
| | |-- key: utf8 (nullable = false)
| | |-- value: int32 (nullable = false)
|-- deeply_nested: struct (nullable = true)
| |-- level1: utf8 (nullable = true)
| |-- level2: list (nullable = false)
| | |-- item: struct (nullable = true)
| | | |-- id: int32 (nullable = false)
| | | |-- properties: map (nullable = true)
| | | | |-- key: utf8 (nullable = false)
| | | | |-- value: float64 (nullable = false)
");
}
#[test]
fn test_print_schema_edge_case_types() {
let schema = DFSchema::from_unqualified_fields(
vec![
Field::new("null_field", DataType::Null, true),
Field::new("binary_field", DataType::Binary, false),
Field::new("large_binary", DataType::LargeBinary, true),
Field::new("large_utf8", DataType::LargeUtf8, false),
Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
Field::new(
"fixed_size_list",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Int32, true)),
5,
),
false,
),
Field::new("decimal32", DataType::Decimal32(9, 4), true),
Field::new("decimal64", DataType::Decimal64(9, 4), true),
Field::new("decimal128", DataType::Decimal128(18, 4), true),
Field::new("decimal256", DataType::Decimal256(38, 10), false),
Field::new("date32", DataType::Date32, true),
Field::new("date64", DataType::Date64, false),
Field::new(
"time32_seconds",
DataType::Time32(arrow::datatypes::TimeUnit::Second),
true,
),
Field::new(
"time64_nanoseconds",
DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
false,
),
]
.into(),
HashMap::new(),
)
.unwrap();
let output = schema.tree_string();
insta::assert_snapshot!(output, @r"
root
|-- null_field: null (nullable = true)
|-- binary_field: binary (nullable = false)
|-- large_binary: large_binary (nullable = true)
|-- large_utf8: large_utf8 (nullable = false)
|-- fixed_size_binary: fixed_size_binary (nullable = true)
|-- fixed_size_list: fixed size list (nullable = false)
| |-- item: int32 (nullable = true)
|-- decimal32: decimal32(9, 4) (nullable = true)
|-- decimal64: decimal64(9, 4) (nullable = true)
|-- decimal128: decimal128(18, 4) (nullable = true)
|-- decimal256: decimal256(38, 10) (nullable = false)
|-- date32: date32 (nullable = true)
|-- date64: date64 (nullable = false)
|-- time32_seconds: time32 (nullable = true)
|-- time64_nanoseconds: time64 (nullable = false)
");
}
}