pub mod scalar;
use std::collections::HashMap;
use std::sync::Arc;
use itertools::Itertools;
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef, TimeUnit,
};
use crate::arrow::error::ArrowError;
use crate::error::Error;
use crate::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use crate::schema::{
ArrayType, ColumnMetadataKey, DataType, MapType, MetadataValue, PrimitiveType, StructField,
StructType,
};
pub(crate) const LIST_ARRAY_ROOT: &str = "element";
pub(crate) const MAP_ROOT_DEFAULT: &str = "key_value";
pub(crate) const MAP_KEY_DEFAULT: &str = "key";
pub(crate) const MAP_VALUE_DEFAULT: &str = "value";
pub(crate) fn kernel_flat_parquet_id_to_arrow_metadata(
field: &StructField,
) -> Result<HashMap<String, String>, ArrowError> {
field
.metadata()
.iter()
.map(|(key, val)| {
let transformed_key = if key == ColumnMetadataKey::ParquetFieldId.as_ref() {
PARQUET_FIELD_ID_META_KEY.to_string()
} else {
key.clone()
};
match val {
MetadataValue::String(s) => Ok((transformed_key, s.clone())),
_ => Ok((
transformed_key,
serde_json::to_string(val).map_err(|e| ArrowError::JsonError(e.to_string()))?,
)),
}
})
.collect()
}
pub(crate) fn lookup_nested_field_id(
field: &StructField,
path: &str,
) -> Result<Option<i64>, ArrowError> {
let key = ColumnMetadataKey::ColumnMappingNestedIds.as_ref();
let Some(value) = field.metadata().get(key) else {
return Ok(None);
};
let MetadataValue::Other(serde_json::Value::Object(obj)) = value else {
return Err(ArrowError::SchemaError(format!(
"'{key}' on field '{}' must be a JSON object",
field.name()
)));
};
obj.get(path).map_or(Ok(None), |entry| {
entry.as_i64().map(Some).ok_or_else(|| {
ArrowError::SchemaError(format!(
"'{key}['{path}']' on field '{}' must be an integer, got {entry}",
field.name()
))
})
})
}
pub(crate) fn parquet_field_id_metadata(id: Option<i64>) -> HashMap<String, String> {
let mut meta = HashMap::new();
if let Some(id) = id {
meta.insert(PARQUET_FIELD_ID_META_KEY.to_string(), id.to_string());
}
meta
}
pub trait TryIntoArrow<ArrowType> {
fn try_into_arrow(self) -> Result<ArrowType, ArrowError>;
}
pub trait TryFromArrow<ArrowType>: Sized {
fn try_from_arrow(t: ArrowType) -> Result<Self, ArrowError>;
}
pub trait TryIntoKernel<KernelType> {
fn try_into_kernel(self) -> Result<KernelType, ArrowError>;
}
pub trait TryFromKernel<KernelType>: Sized {
fn try_from_kernel(t: KernelType) -> Result<Self, ArrowError>;
}
impl<KernelType, ArrowType> TryIntoArrow<ArrowType> for KernelType
where
ArrowType: TryFromKernel<KernelType>,
{
fn try_into_arrow(self) -> Result<ArrowType, ArrowError> {
ArrowType::try_from_kernel(self)
}
}
impl<KernelType, ArrowType> TryIntoKernel<KernelType> for ArrowType
where
KernelType: TryFromArrow<ArrowType>,
{
fn try_into_kernel(self) -> Result<KernelType, ArrowError> {
KernelType::try_from_arrow(self)
}
}
fn try_kernel_struct_to_arrow_fields(s: &StructType) -> Result<Vec<ArrowField>, ArrowError> {
s.fields().map(|f| f.try_into_arrow()).try_collect()
}
impl TryFromKernel<&StructType> for ArrowSchema {
fn try_from_kernel(s: &StructType) -> Result<Self, ArrowError> {
Ok(ArrowSchema::new(try_kernel_struct_to_arrow_fields(s)?))
}
}
impl TryFromKernel<&StructField> for ArrowField {
fn try_from_kernel(f: &StructField) -> Result<Self, ArrowError> {
let mut metadata = kernel_flat_parquet_id_to_arrow_metadata(f)?;
metadata.remove(ColumnMetadataKey::ColumnMappingNestedIds.as_ref());
let arrow_type = kernel_field_into_arrow(f, f.name(), f.data_type())?;
Ok(ArrowField::new(f.name(), arrow_type, f.is_nullable()).with_metadata(metadata))
}
}
fn kernel_field_into_arrow(
ancestor: &StructField,
relative_path: &str,
datatype: &DataType,
) -> Result<ArrowDataType, ArrowError> {
match datatype {
DataType::Array(a) => {
let element_path = format!("{relative_path}.{LIST_ARRAY_ROOT}");
let element_id = lookup_nested_field_id(ancestor, &element_path)?;
let arrow_element_type =
kernel_field_into_arrow(ancestor, &element_path, a.element_type())?;
let arrow_element_field =
ArrowField::new(LIST_ARRAY_ROOT, arrow_element_type, a.contains_null())
.with_metadata(parquet_field_id_metadata(element_id));
Ok(ArrowDataType::List(Arc::new(arrow_element_field)))
}
DataType::Map(m) => {
let key_path = format!("{relative_path}.{MAP_KEY_DEFAULT}");
let value_path = format!("{relative_path}.{MAP_VALUE_DEFAULT}");
let key_id = lookup_nested_field_id(ancestor, &key_path)?;
let value_id = lookup_nested_field_id(ancestor, &value_path)?;
let arrow_key_type = kernel_field_into_arrow(ancestor, &key_path, m.key_type())?;
let arrow_value_type = kernel_field_into_arrow(ancestor, &value_path, m.value_type())?;
let arrow_key_field = ArrowField::new(MAP_KEY_DEFAULT, arrow_key_type, false)
.with_metadata(parquet_field_id_metadata(key_id));
let arrow_value_field =
ArrowField::new(MAP_VALUE_DEFAULT, arrow_value_type, m.value_contains_null())
.with_metadata(parquet_field_id_metadata(value_id));
let arrow_entries_field = ArrowField::new(
MAP_ROOT_DEFAULT,
ArrowDataType::Struct(vec![arrow_key_field, arrow_value_field].into()),
false,
);
Ok(ArrowDataType::Map(
Arc::new(arrow_entries_field),
false,
))
}
DataType::Struct(_) | DataType::Primitive(_) | DataType::Variant(_) => {
datatype.try_into_arrow()
}
}
}
impl TryFromKernel<&ArrayType> for ArrowField {
fn try_from_kernel(a: &ArrayType) -> Result<Self, ArrowError> {
Ok(ArrowField::new(
LIST_ARRAY_ROOT,
a.element_type().try_into_arrow()?,
a.contains_null(),
))
}
}
impl TryFromKernel<&MapType> for ArrowField {
fn try_from_kernel(a: &MapType) -> Result<Self, ArrowError> {
Ok(ArrowField::new(
MAP_ROOT_DEFAULT,
ArrowDataType::Struct(
vec![
ArrowField::new(MAP_KEY_DEFAULT, a.key_type().try_into_arrow()?, false),
ArrowField::new(
MAP_VALUE_DEFAULT,
a.value_type().try_into_arrow()?,
a.value_contains_null(),
),
]
.into(),
),
false, ))
}
}
impl TryFromKernel<&DataType> for ArrowDataType {
fn try_from_kernel(t: &DataType) -> Result<Self, ArrowError> {
match t {
DataType::Primitive(p) => {
match p {
PrimitiveType::String => Ok(ArrowDataType::Utf8),
PrimitiveType::Long => Ok(ArrowDataType::Int64), PrimitiveType::Integer => Ok(ArrowDataType::Int32),
PrimitiveType::Short => Ok(ArrowDataType::Int16),
PrimitiveType::Byte => Ok(ArrowDataType::Int8),
PrimitiveType::Float => Ok(ArrowDataType::Float32),
PrimitiveType::Double => Ok(ArrowDataType::Float64),
PrimitiveType::Boolean => Ok(ArrowDataType::Boolean),
PrimitiveType::Binary => Ok(ArrowDataType::Binary),
PrimitiveType::Decimal(dtype) => Ok(ArrowDataType::Decimal128(
dtype.precision(),
dtype.scale() as i8, )),
PrimitiveType::Date => {
Ok(ArrowDataType::Date32)
}
PrimitiveType::Timestamp => Ok(ArrowDataType::Timestamp(
TimeUnit::Microsecond,
Some("UTC".into()),
)),
PrimitiveType::TimestampNtz => {
Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}
#[cfg(feature = "nanosecond-timestamps")]
PrimitiveType::TimestampNanos => Ok(ArrowDataType::Timestamp(
TimeUnit::Nanosecond,
Some("UTC".into()),
)),
}
}
DataType::Struct(s) => Ok(ArrowDataType::Struct(
try_kernel_struct_to_arrow_fields(s)?.into(),
)),
DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(a.as_ref().try_into_arrow()?))),
DataType::Map(m) => Ok(ArrowDataType::Map(
Arc::new(m.as_ref().try_into_arrow()?),
false,
)),
DataType::Variant(s) => {
if *t == DataType::unshredded_variant() {
Ok(ArrowDataType::Struct(
try_kernel_struct_to_arrow_fields(s)?.into(),
))
} else {
Err(ArrowError::SchemaError(format!(
"Incorrect Variant Schema: {t}. Only the unshredded variant schema is supported right now."
)))
}
}
}
}
}
impl TryFromArrow<&ArrowSchema> for StructType {
fn try_from_arrow(arrow_schema: &ArrowSchema) -> Result<Self, ArrowError> {
StructType::try_from_results(
arrow_schema
.fields()
.iter()
.map(|field| field.as_ref().try_into_kernel()),
)
.map_err(|e| ArrowError::from_external_error(e.into()))
}
}
impl TryFromArrow<ArrowSchemaRef> for StructType {
fn try_from_arrow(arrow_schema: ArrowSchemaRef) -> Result<Self, ArrowError> {
arrow_schema.as_ref().try_into_kernel()
}
}
impl TryFromArrow<&ArrowField> for StructField {
fn try_from_arrow(arrow_field: &ArrowField) -> Result<Self, ArrowError> {
let arrow_metadata = arrow_field.metadata();
if let (Some(arrow_id), Some(kernel_id)) = (
arrow_metadata.get(PARQUET_FIELD_ID_META_KEY),
arrow_metadata.get(ColumnMetadataKey::ParquetFieldId.as_ref()),
) {
if arrow_id != kernel_id {
return Err(ArrowError::SchemaError(format!(
"Field '{}': conflicting parquet field IDs: '{}' ({}) vs '{}' ({})",
arrow_field.name(),
arrow_id,
PARQUET_FIELD_ID_META_KEY,
kernel_id,
ColumnMetadataKey::ParquetFieldId.as_ref(),
)));
}
}
let mut kernel_metadata: HashMap<String, MetadataValue> = arrow_metadata
.iter()
.map(|(k, v)| -> Result<_, ArrowError> {
if k == PARQUET_FIELD_ID_META_KEY || k == ColumnMetadataKey::ParquetFieldId.as_ref()
{
let id: i64 = v.parse().map_err(|_| {
ArrowError::SchemaError(format!(
"'{k}' on field '{}' must be an integer, got '{v}'",
arrow_field.name()
))
})?;
return Ok((
ColumnMetadataKey::ParquetFieldId.as_ref().to_string(),
MetadataValue::Number(id),
));
}
Ok((k.clone(), MetadataValue::from(v)))
})
.collect::<Result<_, _>>()?;
let mut nested_ids = serde_json::Map::new();
collect_nested_field_ids_from_arrow(
arrow_field.data_type(),
arrow_field.name(),
&mut nested_ids,
)?;
if !nested_ids.is_empty() {
kernel_metadata.insert(
ColumnMetadataKey::ColumnMappingNestedIds
.as_ref()
.to_string(),
MetadataValue::Other(serde_json::Value::Object(nested_ids)),
);
}
Ok(StructField::new(
arrow_field.name().clone(),
DataType::try_from_arrow(arrow_field.data_type())?,
arrow_field.is_nullable(),
)
.with_metadata(kernel_metadata))
}
}
fn collect_nested_field_ids_from_arrow(
arrow_type: &ArrowDataType,
relative_path: &str,
out: &mut serde_json::Map<String, serde_json::Value>,
) -> Result<(), ArrowError> {
let inner_fields: Vec<(&ArrowField, &str)> = match arrow_type {
ArrowDataType::List(elem)
| ArrowDataType::ListView(elem)
| ArrowDataType::LargeList(elem)
| ArrowDataType::LargeListView(elem) => vec![(elem.as_ref(), LIST_ARRAY_ROOT)],
ArrowDataType::FixedSizeList(elem, _) => vec![(elem.as_ref(), LIST_ARRAY_ROOT)],
ArrowDataType::Map(entries, _) => match entries.data_type() {
ArrowDataType::Struct(fields) if fields.len() == 2 => vec![
(fields[0].as_ref(), MAP_KEY_DEFAULT),
(fields[1].as_ref(), MAP_VALUE_DEFAULT),
],
other => {
return Err(ArrowError::SchemaError(format!(
"Map entries must be a struct with exactly 2 fields (key, value), got {other}"
)))
}
},
ArrowDataType::Dictionary(_, value_type) => {
return collect_nested_field_ids_from_arrow(value_type, relative_path, out);
}
_ => return Ok(()),
};
for (inner, segment) in inner_fields {
let path = format!("{relative_path}.{segment}");
if let Some(id_str) = inner.metadata().get(PARQUET_FIELD_ID_META_KEY) {
let id: i64 = id_str.parse().map_err(|_| {
ArrowError::SchemaError(format!(
"'{PARQUET_FIELD_ID_META_KEY}' on '{}' must be an integer, got '{id_str}'",
inner.name()
))
})?;
out.insert(path.clone(), serde_json::Value::from(id));
}
collect_nested_field_ids_from_arrow(inner.data_type(), &path, out)?;
}
Ok(())
}
impl TryFromArrow<&ArrowDataType> for DataType {
fn try_from_arrow(arrow_datatype: &ArrowDataType) -> Result<Self, ArrowError> {
match arrow_datatype {
ArrowDataType::Utf8 => Ok(DataType::STRING),
ArrowDataType::LargeUtf8 => Ok(DataType::STRING),
ArrowDataType::Utf8View => Ok(DataType::STRING),
ArrowDataType::Int64 => Ok(DataType::LONG), ArrowDataType::Int32 => Ok(DataType::INTEGER),
ArrowDataType::Int16 => Ok(DataType::SHORT),
ArrowDataType::Int8 => Ok(DataType::BYTE),
ArrowDataType::UInt64 => Ok(DataType::LONG), ArrowDataType::UInt32 => Ok(DataType::INTEGER),
ArrowDataType::UInt16 => Ok(DataType::SHORT),
ArrowDataType::UInt8 => Ok(DataType::BYTE),
ArrowDataType::Float32 => Ok(DataType::FLOAT),
ArrowDataType::Float64 => Ok(DataType::DOUBLE),
ArrowDataType::Boolean => Ok(DataType::BOOLEAN),
ArrowDataType::Binary => Ok(DataType::BINARY),
ArrowDataType::FixedSizeBinary(_) => Ok(DataType::BINARY),
ArrowDataType::LargeBinary => Ok(DataType::BINARY),
ArrowDataType::BinaryView => Ok(DataType::BINARY),
ArrowDataType::Decimal128(p, s) => {
if *s < 0 {
return Err(ArrowError::from_external_error(
Error::invalid_decimal("Negative scales are not supported in Delta").into(),
));
};
DataType::decimal(*p, *s as u8)
.map_err(|e| ArrowError::from_external_error(e.into()))
}
ArrowDataType::Date32 => Ok(DataType::DATE),
ArrowDataType::Date64 => Ok(DataType::DATE),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => Ok(DataType::TIMESTAMP_NTZ),
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz))
if tz.eq_ignore_ascii_case("utc") =>
{
Ok(DataType::TIMESTAMP)
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) => Ok(DataType::TIMESTAMP_NTZ),
#[cfg(feature = "nanosecond-timestamps")]
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some(tz))
if tz.eq_ignore_ascii_case("utc") =>
{
Ok(DataType::TIMESTAMP_NANOS)
}
ArrowDataType::Struct(fields) => DataType::try_struct_type_from_results(
fields.iter().map(|field| field.as_ref().try_into_kernel()),
)
.map_err(|e| ArrowError::from_external_error(e.into())),
ArrowDataType::List(field) => Ok(ArrayType::new(
(*field).data_type().try_into_kernel()?,
(*field).is_nullable(),
)
.into()),
ArrowDataType::ListView(field) => Ok(ArrayType::new(
(*field).data_type().try_into_kernel()?,
(*field).is_nullable(),
)
.into()),
ArrowDataType::LargeList(field) => Ok(ArrayType::new(
(*field).data_type().try_into_kernel()?,
(*field).is_nullable(),
)
.into()),
ArrowDataType::LargeListView(field) => Ok(ArrayType::new(
(*field).data_type().try_into_kernel()?,
(*field).is_nullable(),
)
.into()),
ArrowDataType::FixedSizeList(field, _) => Ok(ArrayType::new(
(*field).data_type().try_into_kernel()?,
(*field).is_nullable(),
)
.into()),
ArrowDataType::Map(field, _) => {
if let ArrowDataType::Struct(struct_fields) = field.data_type() {
let key_type = DataType::try_from_arrow(struct_fields[0].data_type())?;
let value_type = DataType::try_from_arrow(struct_fields[1].data_type())?;
let value_type_nullable = struct_fields[1].is_nullable();
Ok(MapType::new(key_type, value_type, value_type_nullable).into())
} else {
unreachable!("DataType::Map should contain a struct field child");
}
}
ArrowDataType::Dictionary(_, value_type) => {
Ok(value_type.as_ref().try_into_kernel()?)
}
s => Err(ArrowError::SchemaError(format!(
"Invalid data type for Delta Lake: {s}"
))),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use rstest::rstest;
use super::*;
use crate::engine::arrow_conversion::ArrowField;
use crate::engine::arrow_data::unshredded_variant_arrow_type;
use crate::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use crate::schema::{
ArrayType, ColumnMetadataKey, DataType, MapType, MetadataValue, StructField, StructType,
};
use crate::transforms::{transform_output_type, SchemaTransform};
use crate::utils::test_utils::{
array_in_map_kernel_schema, assert_result_error_with_message, collect_arrow_field_metadata,
complex_nested_with_field_ids,
};
use crate::DeltaResult;
#[test]
fn test_metadata_string_conversion() -> DeltaResult<()> {
let mut metadata = HashMap::new();
metadata.insert("description", "hello world".to_owned());
let struct_field = StructField::not_null("name", DataType::STRING).with_metadata(metadata);
let arrow_field = ArrowField::try_from_kernel(&struct_field)?;
let new_metadata = arrow_field.metadata();
assert_eq!(
new_metadata.get("description").unwrap(),
&"hello world".to_owned()
);
Ok(())
}
#[test]
fn test_variant_shredded_type_fail() -> DeltaResult<()> {
let unshredded_variant = DataType::unshredded_variant();
let unshredded_variant_arrow = ArrowDataType::try_from_kernel(&unshredded_variant)?;
assert!(unshredded_variant_arrow == unshredded_variant_arrow_type());
let shredded_variant = DataType::variant_type([
StructField::nullable("metadata", DataType::BINARY),
StructField::nullable("value", DataType::BINARY),
StructField::nullable("typed_value", DataType::INTEGER),
])?;
let shredded_variant_arrow = ArrowDataType::try_from_kernel(&shredded_variant);
assert!(shredded_variant_arrow
.unwrap_err()
.to_string()
.contains("Incorrect Variant Schema"));
Ok(())
}
#[derive(Default)]
struct FieldIdCollector {
field_ids: Vec<(String, String)>, }
impl<'a> SchemaTransform<'a> for FieldIdCollector {
transform_output_type!(|'a, T| ());
fn transform_struct_field(&mut self, field: &'a StructField) {
if let Some(field_id) = field
.metadata()
.get(ColumnMetadataKey::ParquetFieldId.as_ref())
{
self.field_ids
.push((field.name().to_string(), field_id.to_string()));
}
self.recurse_into_struct_field(field)
}
}
#[test]
fn test_try_into_arrow_threads_nested_ids_onto_arrow_schema() -> DeltaResult<()> {
let meta_key = ColumnMetadataKey::ColumnMappingNestedIds.as_ref();
let fixture = complex_nested_with_field_ids(meta_key);
let arrow_schema: ArrowSchema = (&fixture.kernel_schema).try_into_arrow()?;
assert_eq!(
arrow_schema, fixture.expected_arrow_schema,
"try_into_arrow should attach nested field ids for metadata key {meta_key}",
);
Ok(())
}
#[rstest]
#[case::without_nested_id_metadata(array_in_map_kernel_schema(std::iter::empty::<(
String,
MetadataValue,
)>()), /* expected_field_ids */ &[])]
#[case::only_partial_nested_ids_match(array_in_map_kernel_schema([(
ColumnMetadataKey::ColumnMappingNestedIds.as_ref().to_string(),
MetadataValue::Other(test_utils::nested_ids_json(&[
("array_in_map.key", 100),
("array_in_map.value.element", 102),
("array_in_map.notTheKey", 999),
])),
)]), &[("element", "102"), ("key", "100")])]
fn test_try_into_arrow_sets_field_ids_for_matched_nested_ids_only(
#[case] schema: StructType,
#[case] expected_field_ids: &[(&str, &str)],
) -> DeltaResult<()> {
let arrow_schema: ArrowSchema = (&schema).try_into_arrow()?;
let field_ids: HashMap<String, String> =
collect_arrow_field_metadata(&arrow_schema, PARQUET_FIELD_ID_META_KEY)
.into_iter()
.collect();
assert_eq!(field_ids.len(), expected_field_ids.len());
for (field_name, expected_id) in expected_field_ids {
assert_eq!(
field_ids.get(*field_name).map(String::as_str),
Some(*expected_id),
);
}
Ok(())
}
#[test]
fn test_try_into_arrow_invalid_nested_ids_metadata_errors() {
let schema = array_in_map_kernel_schema([(
ColumnMetadataKey::ColumnMappingNestedIds
.as_ref()
.to_string(),
MetadataValue::String("not a json object".to_string()),
)]);
assert_result_error_with_message(
ArrowSchema::try_from_kernel(&schema),
"must be a JSON object",
);
let schema = array_in_map_kernel_schema([(
ColumnMetadataKey::ColumnMappingNestedIds
.as_ref()
.to_string(),
MetadataValue::Other(serde_json::json!({ "array_in_map.key": "oops" })),
)]);
assert_result_error_with_message(
ArrowSchema::try_from_kernel(&schema),
"must be an integer",
);
}
#[test]
fn test_recursive_field_id_transformation() -> DeltaResult<()> {
let inner_struct_type = StructType::try_new(vec![StructField::new(
"inner_field",
DataType::STRING,
false,
)
.with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(3),
)])])?;
let array_item_struct = StructType::try_new(vec![StructField::new(
"array_item",
DataType::INTEGER,
false,
)
.with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(5),
)])])?;
let array_type = ArrayType::new(DataType::Struct(Box::new(array_item_struct)), false);
let map_key_struct = StructType::try_new(vec![StructField::new(
"map_key_field",
DataType::STRING,
false,
)
.with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(7),
)])])?;
let map_value_struct = StructType::try_new(vec![StructField::new(
"map_value_field",
DataType::INTEGER,
false,
)
.with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(8),
)])])?;
let map_type = MapType::new(
DataType::Struct(Box::new(map_key_struct)),
DataType::Struct(Box::new(map_value_struct)),
false,
);
let top_struct = StructType::try_new(vec![
StructField::new("simple_field", DataType::INTEGER, false).with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(1),
)]),
StructField::new(
"nested_struct",
DataType::Struct(Box::new(inner_struct_type)),
false,
)
.with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(2),
)]),
StructField::new("array_field", DataType::Array(Box::new(array_type)), false)
.with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(4),
)]),
StructField::new("map_field", DataType::Map(Box::new(map_type)), false).with_metadata(
[(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(6),
)],
),
])?;
let arrow_schema = ArrowSchema::try_from_kernel(&top_struct)?;
let expected_ids: HashMap<String, String> = [
("simple_field", "1"),
("nested_struct", "2"),
("inner_field", "3"),
("array_field", "4"),
("array_item", "5"),
("map_field", "6"),
("map_key_field", "7"),
("map_value_field", "8"),
]
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let arrow_field_ids: HashMap<String, String> =
collect_arrow_field_metadata(&arrow_schema, PARQUET_FIELD_ID_META_KEY)
.into_iter()
.collect();
assert_eq!(
arrow_field_ids, expected_ids,
"All field IDs should be transformed to PARQUET:field_id"
);
let kernel_struct = StructType::try_from_arrow(&arrow_schema)?;
let mut collector = FieldIdCollector::default();
collector.transform_struct(&kernel_struct);
let kernel_field_ids: HashMap<String, String> = collector.field_ids.into_iter().collect();
assert_eq!(
kernel_field_ids, arrow_field_ids,
"Kernel field IDs should match Arrow field IDs after round-trip"
);
Ok(())
}
#[test]
fn test_arrow_to_kernel_matching_field_ids_succeed() {
let arrow_field = ArrowField::new("a", ArrowDataType::Int32, false).with_metadata(
[
(PARQUET_FIELD_ID_META_KEY.to_string(), "42".to_string()),
(
ColumnMetadataKey::ParquetFieldId.as_ref().to_string(),
"42".to_string(),
),
]
.into(),
);
let kernel = StructField::try_from_arrow(&arrow_field).unwrap();
assert_eq!(
kernel
.metadata()
.get(ColumnMetadataKey::ParquetFieldId.as_ref()),
Some(&MetadataValue::Number(42)),
);
}
#[test]
fn test_arrow_to_kernel_conflicting_field_ids_fail() {
let arrow_field = ArrowField::new("a", ArrowDataType::Int32, false).with_metadata(
[
(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
(
ColumnMetadataKey::ParquetFieldId.as_ref().to_string(),
"2".to_string(),
),
]
.into(),
);
assert_result_error_with_message(
StructField::try_from_arrow(&arrow_field),
"conflicting parquet field IDs",
);
}
#[test]
fn test_try_from_arrow_aggregates_synthesized_nested_ids() {
let fixture =
complex_nested_with_field_ids(ColumnMetadataKey::ColumnMappingNestedIds.as_ref());
let kernel_schema_from_arrow: StructType =
(&fixture.expected_arrow_schema).try_into_kernel().unwrap();
assert_eq!(kernel_schema_from_arrow, fixture.kernel_schema);
}
#[test]
fn test_try_from_arrow_no_nested_ids_when_input_lacks_field_ids() {
let kernel_schema_without_metadata =
array_in_map_kernel_schema(std::iter::empty::<(String, MetadataValue)>());
let arrow_schema_without_parquet_id: ArrowSchema =
(&kernel_schema_without_metadata).try_into_arrow().unwrap();
let kernel_schema: StructType = (&arrow_schema_without_parquet_id)
.try_into_kernel()
.unwrap();
let array_in_map = kernel_schema.fields().next().unwrap();
assert!(!array_in_map
.metadata()
.contains_key(ColumnMetadataKey::ColumnMappingNestedIds.as_ref()));
}
#[test]
fn test_try_from_arrow_invalid_inner_field_id_errors() {
let element_field = ArrowField::new("element", ArrowDataType::Int32, true)
.with_metadata([(PARQUET_FIELD_ID_META_KEY.to_string(), "oops".to_string())].into());
let list_field = ArrowField::new("arr", ArrowDataType::List(Arc::new(element_field)), true);
assert_result_error_with_message(
StructField::try_from_arrow(&list_field),
"must be an integer",
);
}
#[rstest]
#[case::list(ArrowDataType::List(arc_elem_with_id(42)))]
#[case::list_view(ArrowDataType::ListView(arc_elem_with_id(42)))]
#[case::large_list(ArrowDataType::LargeList(arc_elem_with_id(42)))]
#[case::large_list_view(ArrowDataType::LargeListView(arc_elem_with_id(42)))]
#[case::fixed_size_list(ArrowDataType::FixedSizeList(arc_elem_with_id(42), 3))]
fn test_try_from_arrow_aggregates_nested_id_for_all_list_kinds(
#[case] dt: ArrowDataType,
) -> DeltaResult<()> {
let arrow_field = ArrowField::new("arr", dt, true);
let kernel = StructField::try_from_arrow(&arrow_field)?;
assert_eq!(
kernel
.metadata()
.get(ColumnMetadataKey::ColumnMappingNestedIds.as_ref()),
Some(&MetadataValue::Other(
serde_json::json!({"arr.element": 42})
)),
);
Ok(())
}
#[test]
fn test_try_from_arrow_aggregates_nested_id_through_dictionary() -> DeltaResult<()> {
let arrow_dict = ArrowDataType::Dictionary(
Box::new(ArrowDataType::Int32),
Box::new(ArrowDataType::List(arc_elem_with_id(7))),
);
let arrow_field = ArrowField::new("arr", arrow_dict, true);
let kernel = StructField::try_from_arrow(&arrow_field)?;
assert_eq!(
kernel
.metadata()
.get(ColumnMetadataKey::ColumnMappingNestedIds.as_ref()),
Some(&MetadataValue::Other(serde_json::json!({"arr.element": 7}))),
);
Ok(())
}
fn arc_elem_with_id(id: i32) -> Arc<ArrowField> {
Arc::new(
ArrowField::new("element", ArrowDataType::Int32, true)
.with_metadata([(PARQUET_FIELD_ID_META_KEY.to_string(), id.to_string())].into()),
)
}
}