use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::sync::{Arc, OnceLock};
use delta_kernel_derive::internal_api;
use itertools::Itertools;
use tracing::debug;
use crate::arrow::array::cast::AsArray;
use crate::arrow::array::{
make_array, new_null_array, Array as ArrowArray, GenericListArray, MapArray, OffsetSizeTrait,
PrimitiveArray, RecordBatch, StringArray, StructArray,
};
use crate::arrow::buffer::NullBuffer;
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef,
Fields as ArrowFields, Int64Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use crate::arrow::json::writer::{make_encoder, LineDelimited, NullableEncoder};
use crate::arrow::json::{Encoder, EncoderFactory, EncoderOptions, ReaderBuilder, WriterBuilder};
use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::ensure_data_types::DataTypeCompat;
use crate::engine_data::FilteredEngineData;
use crate::parquet::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use crate::parquet::file::metadata::RowGroupMetaData;
use crate::parquet::schema::types::SchemaDescriptor;
use crate::schema::{
ColumnMetadataKey, DataType, MetadataColumnSpec, MetadataValue, Schema, SchemaRef, StructField,
StructType,
};
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error};
macro_rules! prim_array_cmp {
( $left_arr: ident, $right_arr: ident, $(($data_ty: pat, $prim_ty: ty)),+ ) => {
return match $left_arr.data_type() {
$(
$data_ty => {
let prim_array = $left_arr.as_primitive_opt::<$prim_ty>()
.ok_or(Error::invalid_expression(
format!("Cannot cast to primitive array: {}", $left_arr.data_type()))
)?;
let list_array = $right_arr.as_list_opt::<i32>()
.ok_or(Error::invalid_expression(
format!("Cannot cast to list array: {}", $right_arr.data_type()))
)?;
crate::arrow::compute::kernels::comparison::in_list(prim_array, list_array)
}
)+
_ => Err(ArrowError::CastError(
format!("Bad Comparison between: {:?} and {:?}",
$left_arr.data_type(),
$right_arr.data_type())
)
)
}.map_err(Error::generic_err);
};
}
pub(crate) use prim_array_cmp;
type FieldIndex = usize;
type FlattenedRangeIterator<T> = std::iter::Flatten<std::vec::IntoIter<Range<T>>>;
struct KernelFieldInfo<'k> {
parquet_index: FieldIndex,
field: &'k StructField,
}
struct MatchedParquetField<'p, 'k> {
parquet_index: FieldIndex,
parquet_field: &'p ArrowField,
kernel_field_info: Option<KernelFieldInfo<'k>>,
}
#[internal_api]
pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
Error::Arrow(crate::arrow::error::ArrowError::InvalidArgumentError(
s.into(),
))
.with_backtrace()
}
#[internal_api]
pub(crate) struct RowIndexBuilder {
row_group_row_index_ranges: Vec<Range<i64>>,
row_group_ordinals: Option<Vec<usize>>,
}
impl RowIndexBuilder {
#[internal_api]
pub(crate) fn new(row_groups: &[RowGroupMetaData]) -> Self {
let mut row_group_row_index_ranges = Vec::with_capacity(row_groups.len());
let mut offset = 0;
for row_group in row_groups {
let num_rows = row_group.num_rows();
row_group_row_index_ranges.push(offset..offset + num_rows);
offset += num_rows;
}
Self {
row_group_row_index_ranges,
row_group_ordinals: None,
}
}
#[internal_api]
pub(crate) fn select_row_groups(&mut self, ordinals: &[usize]) {
self.row_group_ordinals = Some(ordinals.to_vec())
}
#[internal_api]
pub(crate) fn build(self) -> DeltaResult<FlattenedRangeIterator<i64>> {
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => {
let mut seen_ordinals = HashSet::with_capacity(ordinals.len());
ordinals
.iter()
.map(|&i| {
if !seen_ordinals.insert(i) {
return Err(Error::generic("Found duplicate row group ordinal"));
}
self.row_group_row_index_ranges
.get(i)
.cloned()
.ok_or_else(|| {
Error::generic(format!("Row group ordinal {i} is out of bounds"))
})
})
.try_collect()?
}
None => self.row_group_row_index_ranges,
};
Ok(starting_offsets.into_iter().flatten())
}
}
#[internal_api]
pub(crate) fn fixup_parquet_read(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
file_location: Option<&str>,
target_schema: Option<&ArrowSchemaRef>,
) -> DeltaResult<ArrowEngineData> {
let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes, file_location)?;
let data = fix_nested_null_masks(data);
let data = if let Some(schema) = target_schema {
let batch = RecordBatch::from(data);
let allow_all = |_: &ArrowFieldRef, _: &ArrowFieldRef| Ok(());
coerce_batch_nullability(batch, schema, Some(&allow_all))?.into()
} else {
data
};
Ok(data.into())
}
type TypeMismatchValidator<'a> =
Option<&'a dyn Fn(&ArrowFieldRef, &ArrowFieldRef) -> DeltaResult<()>>;
pub(crate) fn coerce_batch_nullability(
batch: RecordBatch,
target_schema: &ArrowSchemaRef,
type_mismatch_validator: TypeMismatchValidator<'_>,
) -> DeltaResult<RecordBatch> {
if *batch.schema() == **target_schema {
return Ok(batch);
}
fn coerce_struct(
src_column: &Arc<dyn ArrowArray>,
src_children: &ArrowFields,
target_children: &ArrowFields,
type_mismatch_validator: TypeMismatchValidator<'_>,
) -> DeltaResult<Arc<dyn ArrowArray>> {
let src_struct = src_column
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| Error::generic("expected Struct array during nullability coercion"))?;
let (coerced_columns, coerced_fields): (Vec<Arc<dyn ArrowArray>>, Vec<ArrowFieldRef>) =
src_struct
.columns()
.iter()
.zip(src_children.iter())
.zip(target_children.iter())
.map(|((src_child_col, src_child), target_child)| {
coerce(
src_child_col.clone(),
src_child,
target_child,
type_mismatch_validator,
)
})
.collect::<DeltaResult<Vec<_>>>()?
.into_iter()
.unzip();
Ok(Arc::new(StructArray::try_new(
coerced_fields.into(),
coerced_columns,
src_struct.nulls().cloned(),
)?))
}
fn coerce_map(
src_column: &Arc<dyn ArrowArray>,
src_entries_field: &ArrowFieldRef,
target_entries_field: &ArrowFieldRef,
type_mismatch_validator: TypeMismatchValidator<'_>,
) -> DeltaResult<Arc<dyn ArrowArray>> {
let src_map = src_column
.as_any()
.downcast_ref::<MapArray>()
.ok_or_else(|| Error::generic("expected Map array during nullability coercion"))?;
let (_, src_offsets, src_entries, src_nulls, src_ordered) = src_map.clone().into_parts();
let (coerced_entries_col, coerced_entries_field) = coerce(
Arc::new(src_entries),
src_entries_field,
target_entries_field,
type_mismatch_validator,
)?;
let coerced_entries = coerced_entries_col
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
Error::generic("expected Struct array for Map entries during nullability coercion")
})?
.clone();
Ok(Arc::new(MapArray::try_new(
coerced_entries_field,
src_offsets,
coerced_entries,
src_nulls,
src_ordered,
)?))
}
fn coerce_list(
src_column: &Arc<dyn ArrowArray>,
src_element: &ArrowFieldRef,
target_element: &ArrowFieldRef,
type_mismatch_validator: TypeMismatchValidator<'_>,
) -> DeltaResult<Arc<dyn ArrowArray>> {
let src_list = src_column
.as_any()
.downcast_ref::<GenericListArray<i32>>()
.ok_or_else(|| Error::generic("expected List array during nullability coercion"))?;
let (_, src_offsets, src_values, src_nulls) = src_list.clone().into_parts();
let (coerced_values, coerced_element_field) = coerce(
src_values,
src_element,
target_element,
type_mismatch_validator,
)?;
Ok(Arc::new(GenericListArray::<i32>::try_new(
coerced_element_field,
src_offsets,
coerced_values,
src_nulls,
)?))
}
fn coerce(
src_column: Arc<dyn ArrowArray>,
src_field: &ArrowFieldRef,
target_field: &ArrowFieldRef,
type_mismatch_validator: TypeMismatchValidator<'_>,
) -> DeltaResult<(Arc<dyn ArrowArray>, ArrowFieldRef)> {
let coerced_array: Arc<dyn ArrowArray> =
match (src_column.data_type(), target_field.data_type()) {
(ArrowDataType::Struct(src_children), ArrowDataType::Struct(target_children))
if src_children != target_children =>
{
coerce_struct(
&src_column,
src_children,
target_children,
type_mismatch_validator,
)?
}
(
ArrowDataType::Map(src_entries_field, _),
ArrowDataType::Map(target_entries_field, _),
) if src_entries_field != target_entries_field => coerce_map(
&src_column,
src_entries_field,
target_entries_field,
type_mismatch_validator,
)?,
(ArrowDataType::List(src_element), ArrowDataType::List(target_element))
if src_element != target_element =>
{
coerce_list(
&src_column,
src_element,
target_element,
type_mismatch_validator,
)?
}
(src_type, target_type) => {
if src_type != target_type {
if let Some(validator) = type_mismatch_validator {
validator(src_field, target_field)?;
} else {
return Err(Error::internal_error(format!(
"data type mismatch for field '{}': \
source has {src_type:?} but target has {target_type:?}",
src_field.name(),
)));
}
}
let coerced_field = if src_field.is_nullable() == target_field.is_nullable() {
src_field.clone()
} else {
Arc::new(
src_field
.as_ref()
.clone()
.with_nullable(target_field.is_nullable()),
)
};
return Ok((src_column, coerced_field));
}
};
let coerced_field = Arc::new(
src_field
.as_ref()
.clone()
.with_data_type(coerced_array.data_type().clone())
.with_nullable(target_field.is_nullable()),
);
Ok((coerced_array, coerced_field))
}
let batch_schema = batch.schema();
let batch_struct_array = StructArray::from(batch);
let (src_fields, src_columns, _) = batch_struct_array.into_parts();
let (coerced_columns, coerced_fields): (Vec<Arc<dyn ArrowArray>>, Vec<ArrowFieldRef>) =
src_columns
.into_iter()
.zip(src_fields.iter())
.zip(target_schema.fields().iter())
.map(|((src_column, src_field), target_field)| {
coerce(src_column, src_field, target_field, type_mismatch_validator)
})
.collect::<DeltaResult<Vec<_>>>()?
.into_iter()
.unzip();
let coerced_schema = Arc::new(ArrowSchema::new_with_metadata(
coerced_fields,
batch_schema.metadata().clone(),
));
Ok(RecordBatch::try_new(coerced_schema, coerced_columns)?)
}
#[derive(Debug, PartialEq)]
#[internal_api]
pub(crate) struct ReorderIndex {
pub index: usize,
transform: ReorderIndexTransform,
}
#[derive(Debug, PartialEq)]
#[internal_api]
pub(crate) enum ReorderIndexTransform {
Cast(ArrowDataType),
Nested(Vec<ReorderIndex>),
Identity,
Missing(ArrowFieldRef),
RowIndex(ArrowFieldRef),
FilePath(ArrowFieldRef),
}
impl ReorderIndex {
fn new(index: usize, transform: ReorderIndexTransform) -> Self {
ReorderIndex { index, transform }
}
fn cast(index: usize, target: ArrowDataType) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Cast(target))
}
fn nested(index: usize, children: Vec<ReorderIndex>) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Nested(children))
}
fn identity(index: usize) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Identity)
}
fn missing(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Missing(field))
}
fn row_index(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::RowIndex(field))
}
fn file_path(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::FilePath(field))
}
fn needs_transform(&self) -> bool {
match self.transform {
ReorderIndexTransform::Cast(_)
| ReorderIndexTransform::Missing(_)
| ReorderIndexTransform::RowIndex(_)
| ReorderIndexTransform::FilePath(_) => true,
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children),
ReorderIndexTransform::Identity => false,
}
}
}
fn count_cols(field: &ArrowField) -> usize {
_count_cols(field.data_type())
}
fn _count_cols(dt: &ArrowDataType) -> usize {
match dt {
ArrowDataType::Struct(fields) => fields.iter().map(|f| count_cols(f)).sum(),
ArrowDataType::Union(fields, _) => fields.iter().map(|(_, f)| count_cols(f)).sum(),
ArrowDataType::List(field)
| ArrowDataType::LargeList(field)
| ArrowDataType::FixedSizeList(field, _)
| ArrowDataType::Map(field, _) => count_cols(field),
ArrowDataType::Dictionary(_, value_field) => _count_cols(value_field.as_ref()),
_ => 1, }
}
fn validate_parquet_variant(field: &ArrowField) -> DeltaResult<()> {
fn variant_parquet_error(field_name: &String) -> Error {
Error::Generic(format!(
"The field {field_name} presumed to be of Variant type might be \
shredded in the parquet file. The default engine does not support \
shredded reads yet."
))
}
match field.data_type() {
ArrowDataType::Struct(fields) => {
if fields.len() != 2 {
return Err(variant_parquet_error(field.name()));
}
if !matches!(
(fields[0].name().as_str(), fields[1].name().as_str()),
("value", "metadata") | ("metadata", "value")
) {
return Err(variant_parquet_error(field.name()));
}
Ok(())
}
_ => Err(variant_parquet_error(field.name())),
}
}
fn get_indices(
start_parquet_offset: usize,
requested_schema: &Schema,
fields: &ArrowFields,
mask_indices: &mut Vec<usize>,
) -> DeltaResult<(usize, Vec<ReorderIndex>)> {
let mut found_fields = HashSet::with_capacity(requested_schema.num_fields());
let mut reorder_indices = Vec::with_capacity(requested_schema.num_fields());
let mut deferred_missing = Vec::new();
let mut parquet_offset = start_parquet_offset;
let matched_parquet_fields = match_parquet_fields(requested_schema, fields);
for MatchedParquetField {
parquet_index,
parquet_field: field,
kernel_field_info,
} in matched_parquet_fields
{
debug!(
"Getting indices for field {} with offset {parquet_offset}, with index {parquet_index}",
field.name()
);
if let Some(KernelFieldInfo {
parquet_index: index,
field: requested_field,
..
}) = kernel_field_info
{
if requested_field.data_type == DataType::unshredded_variant() {
validate_parquet_variant(field)?;
}
match field.data_type() {
ArrowDataType::Struct(fields) => {
if let DataType::Struct(ref requested_schema)
| DataType::Variant(ref requested_schema) = requested_field.data_type
{
let mask_before = mask_indices.len();
let (parquet_advance, children) = get_indices(
parquet_index + parquet_offset,
requested_schema.as_ref(),
fields,
mask_indices,
)?;
parquet_offset += parquet_advance.saturating_sub(1);
found_fields.insert(requested_field.name());
if mask_indices.len() > mask_before {
reorder_indices.push(ReorderIndex::nested(index, children));
} else {
debug_assert_eq!(children.len(), requested_schema.num_fields());
deferred_missing.push(ReorderIndex::missing(
index,
Arc::new(requested_field.try_into_arrow()?),
));
}
} else {
return Err(Error::unexpected_column_type(field.name()));
}
}
ArrowDataType::List(list_field)
| ArrowDataType::LargeList(list_field)
| ArrowDataType::ListView(list_field) => {
if let DataType::Array(array_type) = requested_field.data_type() {
let requested_schema = StructType::new_unchecked([StructField::new(
list_field.name().clone(), array_type.element_type.clone(),
array_type.contains_null,
)]);
let mask_before = mask_indices.len();
let (parquet_advance, mut children) = get_indices(
parquet_index + parquet_offset,
&requested_schema,
&[list_field.clone()].into(),
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
if mask_indices.len() <= mask_before {
deferred_missing.push(ReorderIndex::missing(
index,
Arc::new(requested_field.try_into_arrow()?),
));
} else if children.len() != 1 {
return Err(Error::generic(
"List call should not have generated more than one reorder index",
));
} else {
let mut children = children.swap_remove(0);
children.index = index;
reorder_indices.push(children);
}
} else {
return Err(Error::unexpected_column_type(list_field.name()));
}
}
ArrowDataType::Map(key_val_field, _) => {
match (key_val_field.data_type(), requested_field.data_type()) {
(ArrowDataType::Struct(inner_fields), DataType::Map(map_type)) => {
let mut key_val_names =
inner_fields.iter().map(|f| f.name().to_string());
let key_name = key_val_names.next().ok_or_else(|| {
Error::generic("map fields didn't include a key col")
})?;
let val_name = key_val_names.next().ok_or_else(|| {
Error::generic("map fields didn't include a val col")
})?;
if key_val_names.next().is_some() {
return Err(Error::generic("map fields had more than 2 members"));
}
let inner_schema = map_type.as_struct_schema(key_name, val_name);
let mask_before = mask_indices.len();
let (parquet_advance, mut children) = get_indices(
parquet_index + parquet_offset,
&inner_schema,
inner_fields,
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
if mask_indices.len() <= mask_before {
deferred_missing.push(ReorderIndex::missing(
index,
Arc::new(requested_field.try_into_arrow()?),
));
} else if children.len() != 2 {
return Err(Error::generic(
"Map call should have generated exactly two reorder indices",
));
} else {
let mut num_identity_transforms = 0;
if !children[0].needs_transform() {
children[0] = ReorderIndex::identity(0);
num_identity_transforms += 1;
}
if !children[1].needs_transform() {
children[1] = ReorderIndex::identity(1);
num_identity_transforms += 1;
}
let transform = match num_identity_transforms {
2 => ReorderIndex::identity(index),
_ => ReorderIndex::nested(index, children),
};
reorder_indices.push(transform);
}
}
_ => {
return Err(Error::unexpected_column_type(field.name()));
}
}
}
_ => {
match super::ensure_data_types::ensure_data_types(
&requested_field.data_type,
field.data_type(),
super::ensure_data_types::ValidationMode::TypesAndNames,
)? {
DataTypeCompat::Identical => {
reorder_indices.push(ReorderIndex::identity(index))
}
DataTypeCompat::NeedsCast(target) => {
reorder_indices.push(ReorderIndex::cast(index, target))
}
DataTypeCompat::Nested => {
return Err(Error::internal_error(
"Comparing nested types in get_indices",
))
}
}
found_fields.insert(requested_field.name());
mask_indices.push(parquet_offset + parquet_index);
}
}
} else {
debug!("Skipping over un-selected field: {}", field.name());
parquet_offset += count_cols(field).saturating_sub(1);
}
}
reorder_indices.extend(deferred_missing);
if found_fields.len() != requested_schema.num_fields() {
for (requested_position, field) in requested_schema.fields().enumerate() {
if !found_fields.contains(field.name()) {
match field.get_metadata_column_spec() {
Some(MetadataColumnSpec::RowIndex) => {
debug!("Inserting a row index column: {}", field.name());
reorder_indices.push(ReorderIndex::row_index(
requested_position,
Arc::new(field.try_into_arrow()?),
));
}
Some(MetadataColumnSpec::FilePath) => {
debug!("Inserting a file path column: {}", field.name());
reorder_indices.push(ReorderIndex::file_path(
requested_position,
Arc::new(field.try_into_arrow()?),
));
}
Some(metadata_spec) => {
return Err(Error::Generic(format!(
"Metadata column {metadata_spec:?} is not supported by the default parquet reader"
)));
}
None if field.nullable => {
debug!("Inserting missing and nullable field: {}", field.name());
reorder_indices.push(ReorderIndex::missing(
requested_position,
Arc::new(field.try_into_arrow()?),
));
}
None => {
return Err(Error::Generic(format!(
"Requested field not found in parquet schema, and field is not nullable: {}",
field.name()
)));
}
}
}
}
}
Ok((
parquet_offset + fields.len() - start_parquet_offset,
reorder_indices,
))
}
fn match_parquet_fields<'k, 'p>(
kernel_schema: &'k StructType,
parquet_fields: &'p ArrowFields,
) -> impl Iterator<Item = MatchedParquetField<'p, 'k>> {
type FieldId = i64;
let field_id_to_name: OnceLock<HashMap<FieldId, &String>> = OnceLock::new();
let init_field_map = || {
kernel_schema
.fields()
.filter_map(
|field| match field.get_config_value(&ColumnMetadataKey::ParquetFieldId) {
Some(MetadataValue::Number(fid)) => Some((*fid, field.name())),
_ => None,
},
)
.collect()
};
parquet_fields
.iter()
.enumerate()
.map(move |(parquet_index, parquet_field)| {
let parquet_field_id = parquet_field
.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|x| x.parse::<FieldId>().ok());
let field_name = parquet_field_id
.and_then(|field_id| {
field_id_to_name
.get_or_init(init_field_map)
.get(&field_id)
.copied()
})
.unwrap_or_else(|| parquet_field.name());
let kernel_field_info =
kernel_schema
.field_with_index(field_name)
.and_then(|(idx, field)| {
(!field.is_metadata_column()).then_some(KernelFieldInfo {
parquet_index: idx,
field,
})
});
MatchedParquetField {
parquet_index,
parquet_field,
kernel_field_info,
}
})
}
#[internal_api]
pub(crate) fn get_requested_indices(
requested_schema: &SchemaRef,
parquet_schema: &ArrowSchemaRef,
) -> DeltaResult<(Vec<usize>, Vec<ReorderIndex>)> {
let mut mask_indices = vec![];
let (_, reorder_indexes) = get_indices(
0,
requested_schema,
parquet_schema.fields(),
&mut mask_indices,
)?;
Ok((mask_indices, reorder_indexes))
}
#[internal_api]
pub(crate) fn generate_mask(
_requested_schema: &SchemaRef,
_parquet_schema: &ArrowSchemaRef,
parquet_physical_schema: &SchemaDescriptor,
indices: &[usize],
) -> Option<ProjectionMask> {
Some(ProjectionMask::leaves(
parquet_physical_schema,
indices.to_owned(),
))
}
fn ordering_needs_transform(requested_ordering: &[ReorderIndex]) -> bool {
if requested_ordering.is_empty() {
return false;
}
if requested_ordering[0].needs_transform() {
return true;
}
requested_ordering
.windows(2)
.any(|ri| (ri[0].index >= ri[1].index) || ri[1].needs_transform())
}
#[internal_api]
pub(crate) fn ordering_needs_row_indexes(requested_ordering: &[ReorderIndex]) -> bool {
requested_ordering
.iter()
.any(|reorder_index| matches!(&reorder_index.transform, ReorderIndexTransform::RowIndex(_)))
}
type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
fn new_missing_array(field: &ArrowField, num_rows: usize) -> Arc<dyn ArrowArray> {
match (field.is_nullable(), field.data_type()) {
(false, ArrowDataType::Struct(child_fields)) => {
let child_arrays: Vec<Arc<dyn ArrowArray>> = child_fields
.iter()
.map(|f| new_missing_array(f, num_rows))
.collect();
Arc::new(StructArray::new(child_fields.clone(), child_arrays, None))
}
_ => new_null_array(field.data_type(), num_rows),
}
}
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
mut row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
file_location: Option<&str>,
) -> DeltaResult<StructArray> {
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
if !ordering_needs_transform(requested_ordering) {
Ok(input_data)
} else {
debug!("Have requested reorder {requested_ordering:#?} on {input_data:?}");
let num_rows = input_data.len();
let num_cols = requested_ordering.len();
let (input_fields, input_cols, null_buffer) = input_data.into_parts();
let mut final_fields_cols: Vec<FieldArrayOpt> = vec![None; num_cols];
for (parquet_position, reorder_index) in requested_ordering.iter().enumerate() {
match &reorder_index.transform {
ReorderIndexTransform::Cast(target) => {
let col = input_cols[parquet_position].as_ref();
let col = Arc::new(crate::arrow::compute::cast(col, target)?);
let new_field = Arc::new(
input_fields[parquet_position]
.as_ref()
.clone()
.with_data_type(col.data_type().clone()),
);
final_fields_cols[reorder_index.index] = Some((new_field, col));
}
ReorderIndexTransform::Nested(children) => {
let input_field_name = input_fields[parquet_position].name();
match input_cols[parquet_position].data_type() {
ArrowDataType::Struct(_) => {
let struct_array = input_cols[parquet_position].as_struct().clone();
let result_array = Arc::new(reorder_struct_array(
struct_array,
children,
None,
None,
)?);
let new_field = Arc::new(ArrowField::new_struct(
input_field_name,
result_array.fields().clone(),
input_fields[parquet_position].is_nullable(),
));
final_fields_cols[reorder_index.index] =
Some((new_field, result_array));
}
ArrowDataType::List(_) => {
let list_array = input_cols[parquet_position].as_list::<i32>().clone();
final_fields_cols[reorder_index.index] =
reorder_list(list_array, input_field_name, children)?;
}
ArrowDataType::LargeList(_) => {
let list_array = input_cols[parquet_position].as_list::<i64>().clone();
final_fields_cols[reorder_index.index] =
reorder_list(list_array, input_field_name, children)?;
}
ArrowDataType::Map(_, _) => {
let map_array = input_cols[parquet_position].as_map().clone();
final_fields_cols[reorder_index.index] =
reorder_map(map_array, input_field_name, children)?;
}
_ => {
return Err(Error::internal_error(
"Nested reorder can only apply to struct/list/map.",
));
}
}
}
ReorderIndexTransform::Identity => {
final_fields_cols[reorder_index.index] = Some((
input_fields[parquet_position].clone(), input_cols[parquet_position].clone(), ));
}
ReorderIndexTransform::Missing(field) => {
let array = new_missing_array(field, num_rows);
final_fields_cols[reorder_index.index] = Some((field.clone(), array));
}
ReorderIndexTransform::RowIndex(field) => {
let Some(ref mut row_index_iter) = row_indexes else {
return Err(Error::generic(
"Row index column requested but row index iterator not provided",
));
};
let row_index_array: PrimitiveArray<Int64Type> =
row_index_iter.take(num_rows).collect();
require!(
row_index_array.len() == num_rows,
Error::internal_error(
"Row index iterator exhausted before reaching the end of the file"
)
);
final_fields_cols[reorder_index.index] =
Some((Arc::clone(field), Arc::new(row_index_array)));
}
ReorderIndexTransform::FilePath(field) => {
let Some(file_path) = file_location else {
return Err(Error::generic(
"File path column requested but file location not provided",
));
};
let file_path_array = StringArray::from(vec![file_path; num_rows]);
final_fields_cols[reorder_index.index] =
Some((Arc::clone(field), Arc::new(file_path_array)));
}
}
}
let num_cols = final_fields_cols.len();
let (field_vec, reordered_columns): (Vec<Arc<ArrowField>>, _) =
final_fields_cols.into_iter().flatten().unzip();
if field_vec.len() != num_cols {
Err(Error::internal_error("Found a None in final_fields_cols."))
} else {
Ok(StructArray::try_new(
field_vec.into(),
reordered_columns,
null_buffer,
)?)
}
}
}
fn reorder_list<O: OffsetSizeTrait>(
list_array: GenericListArray<O>,
input_field_name: &str,
children: &[ReorderIndex],
) -> DeltaResult<FieldArrayOpt> {
let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts();
if let Some(struct_array) = maybe_sa.as_struct_opt() {
let struct_array = struct_array.clone();
let result_array = Arc::new(reorder_struct_array(
struct_array,
children,
None,
None, )?);
let new_list_field = Arc::new(ArrowField::new_struct(
list_field.name(),
result_array.fields().clone(),
result_array.is_nullable(),
));
let new_field = Arc::new(ArrowField::new_list(
input_field_name,
new_list_field.clone(),
list_field.is_nullable(),
));
let list = Arc::new(GenericListArray::try_new(
new_list_field,
offset_buffer,
result_array,
null_buf,
)?);
Ok(Some((new_field, list)))
} else {
Err(Error::internal_error(
"Nested reorder of list should have had struct child.",
))
}
}
fn reorder_map(
map_array: MapArray,
input_field_name: &str,
children: &[ReorderIndex],
) -> DeltaResult<FieldArrayOpt> {
let (map_field, offset_buffer, struct_array, null_buf, ordered) = map_array.into_parts();
let result_array = reorder_struct_array(
struct_array,
children,
None, None, )?;
let result_fields = result_array.fields();
let new_map_field = Arc::new(ArrowField::new_struct(
map_field.name(),
result_fields.clone(),
result_array.is_nullable(),
));
let key_field = result_fields[0].clone();
let val_field = result_fields[1].clone();
let new_field = Arc::new(ArrowField::new_map(
input_field_name,
map_field.name(),
key_field,
val_field,
ordered,
map_field.is_nullable(),
));
let map = Arc::new(MapArray::try_new(
new_map_field,
offset_buffer,
result_array,
null_buf,
ordered,
)?);
Ok(Some((new_field, map)))
}
pub fn fix_nested_null_masks(batch: StructArray) -> StructArray {
compute_nested_null_masks(batch, None)
}
fn compute_nested_null_masks(sa: StructArray, parent_nulls: Option<&NullBuffer>) -> StructArray {
let (fields, columns, nulls) = sa.into_parts();
let nulls = NullBuffer::union(parent_nulls, nulls.as_ref());
let columns = columns
.into_iter()
.map(|column| match column.as_struct_opt() {
Some(sa) => Arc::new(compute_nested_null_masks(sa.clone(), nulls.as_ref())) as _,
None => {
let data = column.to_data();
let nulls = NullBuffer::union(nulls.as_ref(), data.nulls());
let builder = data.into_builder().nulls(nulls);
let data = unsafe { builder.build_unchecked() };
make_array(data)
}
})
.collect();
unsafe { StructArray::new_unchecked(fields, columns, nulls) }
}
#[internal_api]
pub(crate) fn parse_json(
json_strings: Box<dyn EngineData>,
schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
let json_strings = json_strings
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::generic("Expected json_strings to be a StringArray, found something else")
})?;
let schema = Arc::new(ArrowSchema::try_from_kernel(schema.as_ref())?);
let result = parse_json_impl(json_strings, schema)?;
Ok(Box::new(ArrowEngineData::new(result)))
}
pub(crate) fn parse_json_impl(
json_strings: &StringArray,
schema: ArrowSchemaRef,
) -> DeltaResult<RecordBatch> {
if json_strings.is_empty() {
return Ok(RecordBatch::new_empty(schema));
}
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(json_strings.len())
.with_coerce_primitive(true)
.build_decoder()?;
for (json, row_number) in json_strings.iter().zip(1..) {
let line = json.unwrap_or("{}");
let consumed = decoder.decode(line.as_bytes())?;
if consumed != line.len() || decoder.has_partial_record() {
return Err(Error::Generic(format!(
"Malformed JSON: Multiple, partial, or 0 JSON objects on row {row_number}"
)));
}
if decoder.len() != row_number {
return Err(Error::Generic(format!(
"Malformed JSON: Multiple, partial, or 0 JSON objects on row {row_number}"
)));
}
}
if let Some(batch) = decoder.flush()? {
if batch.num_rows() != json_strings.len() {
return Err(Error::Generic(format!(
"Unexpected number of rows decoded. Got {}, expected{}",
batch.num_rows(),
json_strings.len()
)));
}
return Ok(batch);
}
Err(Error::generic(
"Malformed JSON: exited parse_json_impl without deserializing anything useful",
))
}
pub(crate) fn filter_to_record_batch(
filtered_data: FilteredEngineData,
) -> DeltaResult<RecordBatch> {
let filtered = filtered_data.apply_selection_vector()?;
let arrow_data = ArrowEngineData::try_from_engine_data(filtered)?;
Ok((*arrow_data).into())
}
struct NullValueMapEncoder<'a> {
field: &'a ArrowFieldRef,
array: &'a MapArray,
}
impl<'a> Encoder for NullValueMapEncoder<'a> {
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
let options = EncoderOptions::default().with_explicit_nulls(true);
#[allow(clippy::unwrap_used)]
let mut encoder = make_encoder(self.field, self.array, &options).unwrap();
encoder.encode(idx, out);
}
}
#[derive(Debug)]
struct NullValueMapEncoderFactory;
impl EncoderFactory for NullValueMapEncoderFactory {
fn make_default_encoder<'a>(
&self,
field: &'a ArrowFieldRef,
array: &'a dyn ArrowArray,
_options: &'a EncoderOptions,
) -> Result<Option<NullableEncoder<'a>>, crate::arrow::error::ArrowError> {
match array.data_type() {
ArrowDataType::Map(_, _) => {
let array = array.as_map();
let encoder = NullValueMapEncoder { field, array };
let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
let nulls = array.nulls().cloned();
Ok(Some(NullableEncoder::new(array_encoder, nulls)))
}
_ => Ok(None),
}
}
}
#[internal_api]
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send,
) -> DeltaResult<Vec<u8>> {
let builder = WriterBuilder::new().with_encoder_factory(Arc::new(NullValueMapEncoderFactory));
let mut writer = builder.build::<_, LineDelimited>(Vec::new());
for chunk in data {
let batch = filter_to_record_batch(chunk?)?;
writer.write(&batch)?;
}
writer.finish()?;
Ok(writer.into_inner())
}
#[internal_api]
pub(crate) fn fixup_json_read(
batch: RecordBatch,
reorder_indices: &[ReorderIndex],
file_location: &str,
) -> DeltaResult<ArrowEngineData> {
let data = reorder_struct_array(batch.into(), reorder_indices, None, Some(file_location))?;
Ok(data.into())
}
#[internal_api]
pub(crate) fn build_json_reorder_indices(schema: &StructType) -> DeltaResult<Vec<ReorderIndex>> {
let mut reorder_indices = Vec::with_capacity(schema.num_fields());
let mut metadata_entries = Vec::new();
for (output_pos, field) in schema.fields().enumerate() {
match field.get_metadata_column_spec() {
None => reorder_indices.push(ReorderIndex::identity(output_pos)),
Some(spec) => metadata_entries.push((output_pos, field, spec)),
}
}
for (output_pos, field, spec) in metadata_entries {
let field = Arc::new(field.try_into_arrow()?);
let rindex = match spec {
MetadataColumnSpec::FilePath => ReorderIndex::file_path(output_pos, field),
_ => ReorderIndex::missing(output_pos, field),
};
reorder_indices.push(rindex);
}
Ok(reorder_indices)
}
#[internal_api]
pub(crate) fn json_arrow_schema(schema: &StructType) -> DeltaResult<ArrowSchema> {
let json_fields = schema.with_fields_filtered(|f| f.get_metadata_column_spec().is_none())?;
Ok(ArrowSchema::try_from_kernel(&json_fields)?)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::arrow::array::{
Array, ArrayRef as ArrowArrayRef, AsArray, BooleanArray, GenericListArray, Int32Array,
Int32Builder, Int64Array, MapArray, MapBuilder, StringArray, StringBuilder, StructArray,
StructBuilder,
};
use crate::arrow::buffer::{OffsetBuffer, ScalarBuffer};
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use crate::engine::arrow_conversion::TryIntoArrow;
use crate::schema::{
ArrayType, ColumnMetadataKey, DataType, MapType, MetadataColumnSpec, MetadataValue,
StructField, StructType,
};
use crate::table_features::ColumnMappingMode;
use crate::utils::test_utils::assert_result_error_with_message;
fn column_mapping_cases() -> [ColumnMappingMode; 3] {
[
ColumnMappingMode::Id,
ColumnMappingMode::Name,
ColumnMappingMode::None,
]
}
fn logical_name(field_id: i64) -> String {
format!("logical-{field_id}")
}
fn physical_name(field_id: i64) -> String {
format!("physical-{field_id}")
}
fn parquet_name(field_id: i64, mode: ColumnMappingMode) -> String {
match mode {
ColumnMappingMode::Id | ColumnMappingMode::Name => physical_name(field_id),
ColumnMappingMode::None => logical_name(field_id),
}
}
fn column_mapping_metadata(
field_id: i64,
mode: ColumnMappingMode,
) -> HashMap<String, MetadataValue> {
match mode {
ColumnMappingMode::None => HashMap::new(),
_ => kernel_fid_and_name(field_id, physical_name(field_id)),
}
}
fn arrow_fid(field_id: i64) -> HashMap<String, String> {
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string())])
}
fn kernel_fid_and_name(field_id: i64, name: impl AsRef<str>) -> HashMap<String, MetadataValue> {
HashMap::from([
(
ColumnMetadataKey::ColumnMappingId.as_ref().to_string(),
field_id.into(),
),
(
ColumnMetadataKey::ColumnMappingPhysicalName
.as_ref()
.to_string(),
name.as_ref().to_string().into(),
),
])
}
fn create_mock_row_group(num_rows: i64) -> RowGroupMetaData {
use crate::parquet::basic::{Encoding, Type as PhysicalType};
use crate::parquet::file::metadata::ColumnChunkMetaData;
use crate::parquet::schema::types::Type;
let schema = Arc::new(SchemaDescriptor::new(Arc::new(
Type::group_type_builder("schema")
.with_fields(vec![Arc::new(
Type::primitive_type_builder("test_col", PhysicalType::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap(),
)));
let column_chunk = ColumnChunkMetaData::builder(schema.column(0))
.set_encodings(vec![Encoding::PLAIN])
.set_total_compressed_size(100)
.set_total_uncompressed_size(100)
.set_num_values(num_rows)
.build()
.unwrap();
RowGroupMetaData::builder(schema)
.set_num_rows(num_rows)
.set_total_byte_size(100)
.set_column_metadata(vec![column_chunk])
.build()
.unwrap()
}
#[test]
fn test_json_parsing() {
static EXPECTED_JSON_ERR_STR: &str = "Generic delta kernel error: Malformed JSON: Multiple, partial, or 0 JSON objects on row";
fn check_parse_fails(
input: Vec<Option<&str>>,
schema: ArrowSchemaRef,
expected_start: &str,
) {
let result = parse_json_impl(&input.into(), schema);
let err = result.expect_err("Expected an error");
let msg = err.to_string();
assert!(
msg.starts_with(expected_start),
"Error message was not what was expected"
);
}
let requested_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", ArrowDataType::Int32, true),
ArrowField::new("b", ArrowDataType::Utf8, true),
ArrowField::new("c", ArrowDataType::Int32, true),
]));
let input: Vec<&str> = vec![];
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
assert_eq!(result.num_rows(), 0);
for input in [
vec![Some("")],
vec![Some(" \n\t")],
vec![Some(r#"{ "a": 1"#)],
vec![Some("{}{}")],
vec![Some(r#"{} { "a": 1"#)],
vec![Some(r#"{} { "a": 1"#), Some("}")],
vec![Some(r#"{ "a": 1"#), Some(r#", "b": "b"}"#)],
] {
check_parse_fails(input, requested_schema.clone(), EXPECTED_JSON_ERR_STR);
}
check_parse_fails(
vec![Some(r#""a""#)],
requested_schema.clone(),
"Json error: expected { got \"a\"",
);
let input: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1, "b": "2", "c": 3}"#), None];
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
assert_eq!(result.num_rows(), 3);
assert_eq!(result.column(0).null_count(), 2);
assert_eq!(result.column(1).null_count(), 2);
assert_eq!(result.column(2).null_count(), 2);
}
#[test]
fn test_parse_json_with_long_strings() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"long_val",
ArrowDataType::Utf8,
true,
)]));
let long_string = "a".repeat(1_000_000); let json_string = format!(r#"{{"long_val": "{long_string}"}}"#);
let input: Vec<Option<&str>> = vec![Some(&json_string)];
let batch = parse_json_impl(&input.into(), schema.clone()).unwrap();
assert_eq!(batch.num_rows(), 1);
let long_col = batch.column(0).as_string::<i32>();
assert_eq!(long_col.value(0), long_string);
}
#[test]
fn test_parse_json_impl_propagates_type_errors() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
ArrowDataType::Decimal128(4, 2),
true,
)]));
let input: Vec<Option<&str>> = vec![Some(r#"{"a": 99999}"#)];
assert!(parse_json_impl(&input.into(), schema).is_err());
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
ArrowDataType::Int64,
true,
)]));
let input: Vec<Option<&str>> = vec![Some(r#"{"a": "not_a_number"}"#)];
assert!(parse_json_impl(&input.into(), schema).is_err());
}
#[test]
fn simple_mask_indices() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(0), DataType::INTEGER)
.with_metadata(column_mapping_metadata(0, mode)),
StructField::nullable(logical_name(1), DataType::STRING)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::nullable(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(0, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(0)),
ArrowField::new(parquet_name(1, mode), ArrowDataType::Utf8, true)
.with_metadata(arrow_fid(1)),
ArrowField::new(parquet_name(2, mode), ArrowDataType::Int32, true)
.with_metadata(arrow_fid(2)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn test_variant_masks() {
fn unshredded_variant_parquet_schema() -> ArrowField {
ArrowField::new(
"v",
ArrowDataType::Struct(
vec![
ArrowField::new("metadata", ArrowDataType::Binary, false),
ArrowField::new("value", ArrowDataType::Binary, false),
]
.into(),
),
true,
)
}
fn shredded_variant_parquet_schema() -> ArrowField {
ArrowField::new(
"v",
ArrowDataType::Struct(
vec![
ArrowField::new("metadata", ArrowDataType::Binary, false),
ArrowField::new("value", ArrowDataType::Binary, true),
ArrowField::new("typed_value", ArrowDataType::Int32, true),
]
.into(),
),
true,
)
}
fn incorrect_variant_parquet_schema() -> ArrowField {
ArrowField::new(
"v",
ArrowDataType::Struct(
vec![
ArrowField::new("field1", ArrowDataType::Binary, false),
ArrowField::new("field2", ArrowDataType::Binary, false),
]
.into(),
),
true,
)
}
fn scalar_variant_parquet_schema() -> ArrowField {
ArrowField::new("v", ArrowDataType::Int16, true)
}
let requested_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"v",
DataType::unshredded_variant(),
)]));
let unshredded_parquet_schema =
Arc::new(ArrowSchema::new(vec![unshredded_variant_parquet_schema()]));
let shredded_parquet_schema =
Arc::new(ArrowSchema::new(vec![shredded_variant_parquet_schema()]));
let incorrect_parquet_schema =
Arc::new(ArrowSchema::new(vec![incorrect_variant_parquet_schema()]));
let scalar_parquet_schema =
Arc::new(ArrowSchema::new(vec![scalar_variant_parquet_schema()]));
let result_unshredded =
get_requested_indices(&requested_schema, &unshredded_parquet_schema);
assert!(result_unshredded.is_ok());
let result_shredded = get_requested_indices(&requested_schema, &shredded_parquet_schema);
assert!(matches!(result_shredded,
Err(e) if e.to_string().contains("The default engine does not support shredded reads")));
let result_incorrect = get_requested_indices(&requested_schema, &incorrect_parquet_schema);
assert!(matches!(result_incorrect,
Err(e) if e.to_string().contains("The default engine does not support shredded reads")));
let result_scalar = get_requested_indices(&requested_schema, &scalar_parquet_schema);
assert!(matches!(result_scalar,
Err(e) if e.to_string().contains("The default engine does not support shredded reads")));
let requested_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"struct_v",
StructType::new_unchecked([StructField::nullable("v", DataType::unshredded_variant())]),
)]));
let unshredded_parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"struct_v",
ArrowDataType::Struct(vec![unshredded_variant_parquet_schema()].into()),
true,
)]));
let shredded_parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"struct_v",
ArrowDataType::Struct(vec![shredded_variant_parquet_schema()].into()),
true,
)]));
let result_unshredded =
get_requested_indices(&requested_schema, &unshredded_parquet_schema);
let result_shredded = get_requested_indices(&requested_schema, &shredded_parquet_schema);
assert!(result_unshredded.is_ok());
assert!(matches!(result_shredded,
Err(e) if e.to_string().contains("The default engine does not support shredded reads")));
let requested_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"array_v",
ArrayType::new(DataType::unshredded_variant(), true),
)]));
let unshredded_parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"array_v",
ArrowDataType::List(Arc::new(unshredded_variant_parquet_schema())),
true,
)]));
let shredded_parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"array_v",
ArrowDataType::List(Arc::new(shredded_variant_parquet_schema())),
true,
)]));
let result_unshredded =
get_requested_indices(&requested_schema, &unshredded_parquet_schema);
let result_shredded = get_requested_indices(&requested_schema, &shredded_parquet_schema);
assert!(result_unshredded.is_ok());
assert!(matches!(result_shredded,
Err(e) if e.to_string().contains("The default engine does not support shredded reads")));
let requested_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"map_v",
MapType::new(DataType::STRING, DataType::unshredded_variant(), true),
)]));
let unshredded_parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map(
"map_v",
"struc_v",
ArrowField::new("s", ArrowDataType::Utf8, false),
unshredded_variant_parquet_schema(),
false,
false,
)]));
let shredded_parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map(
"map_v",
"struc_v",
ArrowField::new("s", ArrowDataType::Utf8, false),
shredded_variant_parquet_schema(),
false,
false,
)]));
let result_unshredded =
get_requested_indices(&requested_schema, &unshredded_parquet_schema);
let result_shredded = get_requested_indices(&requested_schema, &shredded_parquet_schema);
assert!(result_unshredded.is_ok());
assert!(matches!(result_shredded,
Err(e) if e.to_string().contains("The default engine does not support shredded reads")));
}
#[test]
fn ensure_data_types_fails_correctly() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(0), DataType::INTEGER)
.with_metadata(column_mapping_metadata(0, mode)),
StructField::nullable(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(0, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(0)),
ArrowField::new(parquet_name(1, mode), ArrowDataType::Utf8, true)
.with_metadata(arrow_fid(1)),
]));
let res = get_requested_indices(&requested_schema, &parquet_schema);
assert_result_error_with_message(
res,
"Invalid argument error: Incorrect datatype. Expected integer, got Utf8",
);
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(0), DataType::INTEGER)
.with_metadata(column_mapping_metadata(0, mode)),
StructField::nullable(logical_name(1), DataType::STRING)
.with_metadata(column_mapping_metadata(1, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(0, mode), ArrowDataType::Int32, false),
ArrowField::new(parquet_name(1, mode), ArrowDataType::Int32, true),
]));
let res = get_requested_indices(&requested_schema, &parquet_schema);
assert_result_error_with_message(
res,
"Invalid argument error: Incorrect datatype. Expected Utf8, got Int32",
);
})
}
#[test]
fn mask_with_map() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([StructField::not_null(
logical_name(0),
MapType::new(DataType::INTEGER, DataType::STRING, false),
)
.with_metadata(column_mapping_metadata(0, mode))])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map(
parquet_name(0, mode),
"entries",
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, false),
false,
false,
)
.with_metadata(arrow_fid(1))]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expect_reorder = vec![ReorderIndex::identity(0)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn simple_reorder_indices() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(0), DataType::INTEGER)
.with_metadata(column_mapping_metadata(0, mode)),
StructField::nullable(logical_name(1), DataType::STRING)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::nullable(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(2, mode), ArrowDataType::Int32, true)
.with_metadata(arrow_fid(2)),
ArrowField::new(parquet_name(0, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(0)),
ArrowField::new(parquet_name(1, mode), ArrowDataType::Utf8, true)
.with_metadata(arrow_fid(1)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::identity(0),
ReorderIndex::identity(1),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
})
}
#[test]
fn simple_nullable_field_missing() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(0), DataType::INTEGER)
.with_metadata(column_mapping_metadata(0, mode)),
StructField::nullable(logical_name(1), DataType::STRING)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::nullable(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(0, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(0)),
ArrowField::new(parquet_name(2, mode), ArrowDataType::Int32, true)
.with_metadata(arrow_fid(2)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expected_arrow_field = requested_schema
.field(parquet_name(1, mode))
.unwrap()
.try_into_arrow()
.unwrap();
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::missing(1, Arc::new(expected_arrow_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn get_requested_indices_by_id_only() {
let requested_schema = StructType::new_unchecked([
StructField::not_null("i_logical", DataType::INTEGER)
.with_metadata(kernel_fid_and_name(1, "i_physical")),
StructField::nullable("s_logical", DataType::STRING)
.with_metadata(kernel_fid_and_name(2, "s_physical")),
StructField::nullable("i2_logical", DataType::INTEGER)
.with_metadata(kernel_fid_and_name(3, "i2_physical")),
])
.make_physical(ColumnMappingMode::Id)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("not-i", ArrowDataType::Int32, false).with_metadata(arrow_fid(1)),
ArrowField::new("not-i2", ArrowDataType::Int32, true).with_metadata(arrow_fid(3)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expected_arrow_field = requested_schema
.field("s_physical")
.unwrap()
.try_into_arrow()
.unwrap();
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::missing(1, Arc::new(expected_arrow_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn get_requested_indices_by_id_falls_back_to_name() {
let requested_schema = StructType::new_unchecked([
StructField::not_null("i_logical", DataType::INTEGER)
.with_metadata(kernel_fid_and_name(1, "i_physical")),
StructField::nullable("s_logical", DataType::STRING)
.with_metadata(kernel_fid_and_name(2, "s_physical")),
StructField::nullable("i2_logical", DataType::INTEGER)
.with_metadata(kernel_fid_and_name(3, "i2_physical")),
])
.make_physical(ColumnMappingMode::Id)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i_logical", ArrowDataType::Int32, false).with_metadata(arrow_fid(1)),
ArrowField::new("i2_physical", ArrowDataType::Int32, true).with_metadata(arrow_fid(3)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expected_arrow_field = requested_schema
.field("s_physical")
.unwrap()
.try_into_arrow()
.unwrap();
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::missing(1, Arc::new(expected_arrow_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
fn nested_parquet_schema(mode: ColumnMappingMode) -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(1, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(1)),
ArrowField::new(
parquet_name(3, mode),
ArrowDataType::Struct(
vec![
ArrowField::new(parquet_name(4, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(4)),
ArrowField::new(parquet_name(5, mode), ArrowDataType::Utf8, false)
.with_metadata(arrow_fid(5)),
]
.into(),
),
false,
)
.with_metadata(arrow_fid(3)),
ArrowField::new(parquet_name(2, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(2)),
]))
}
#[test]
fn test_match_parquet_fields_filters_metadata_columns() {
let kernel_schema = StructType::new_unchecked([
StructField::not_null("regular_field", DataType::INTEGER),
StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
StructField::nullable("another_field", DataType::STRING),
]);
let parquet_fields: ArrowFields = vec![
ArrowField::new("regular_field", ArrowDataType::Int32, false),
ArrowField::new("row_index", ArrowDataType::Int64, false),
ArrowField::new("another_field", ArrowDataType::Utf8, true),
]
.into();
let matched_fields: Vec<_> =
match_parquet_fields(&kernel_schema, &parquet_fields).collect();
assert_eq!(matched_fields.len(), 3);
assert!(matched_fields[0].kernel_field_info.is_some());
assert_eq!(matched_fields[0].parquet_field.name(), "regular_field");
assert!(matched_fields[1].kernel_field_info.is_none());
assert_eq!(matched_fields[1].parquet_field.name(), "row_index");
assert!(matched_fields[2].kernel_field_info.is_some());
assert_eq!(matched_fields[2].parquet_field.name(), "another_field");
}
#[test]
fn test_ordering_needs_row_indexes() {
let ordering_no_row_index = vec![
ReorderIndex::identity(0),
ReorderIndex::cast(1, ArrowDataType::Int64),
ReorderIndex::missing(
2,
Arc::new(ArrowField::new("missing", ArrowDataType::Utf8, true)),
),
];
assert!(!ordering_needs_row_indexes(&ordering_no_row_index));
let ordering_with_row_index = vec![
ReorderIndex::identity(0),
ReorderIndex::row_index(
1,
Arc::new(ArrowField::new("row_idx", ArrowDataType::Int64, false)),
),
];
assert!(ordering_needs_row_indexes(&ordering_with_row_index));
assert!(!ordering_needs_row_indexes(&[]));
}
#[test]
fn test_reorder_struct_array_missing_row_indexes() {
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::row_index(
1,
Arc::new(ArrowField::new("row_idx", ArrowDataType::Int64, false)),
),
];
let result = reorder_struct_array(arry, &reorder, None, None);
assert_result_error_with_message(
result,
"Row index column requested but row index iterator not provided",
);
}
#[test]
fn test_reorder_struct_array_with_row_indexes() {
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::row_index(
1,
Arc::new(ArrowField::new("row_idx", ArrowDataType::Int64, false)),
),
];
#[allow(clippy::single_range_in_vec_init)]
let mut row_indexes = vec![(0..4)].into_iter().flatten();
let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes), None).unwrap();
assert_eq!(ordered.column_names(), vec!["b", "row_idx"]);
let row_idx_col = ordered.column(1).as_primitive::<Int64Type>();
assert_eq!(row_idx_col.values(), &[0, 1, 2, 3]);
}
#[test]
fn simple_row_index_field() {
let requested_schema = Arc::new(StructType::new_unchecked([
StructField::not_null("i", DataType::INTEGER),
StructField::create_metadata_column("my_row_index", MetadataColumnSpec::RowIndex),
StructField::nullable("i2", DataType::INTEGER),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let mut arrow_row_index_field =
ArrowField::new("my_row_index", ArrowDataType::Int64, false);
arrow_row_index_field.set_metadata(HashMap::from([(
"delta.metadataSpec".to_string(),
"row_index".to_string(),
)]));
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::row_index(1, Arc::new(arrow_row_index_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn simple_file_path_field() {
let requested_schema = Arc::new(StructType::new_unchecked([
StructField::not_null("i", DataType::INTEGER),
StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
StructField::nullable("i2", DataType::INTEGER),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let mut arrow_file_path_field = ArrowField::new("_file", ArrowDataType::Utf8, false);
arrow_file_path_field.set_metadata(HashMap::from([(
"delta.metadataSpec".to_string(),
"_file".to_string(),
)]));
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::file_path(1, Arc::new(arrow_file_path_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn test_reorder_struct_array_with_file_path() {
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::file_path(
1,
Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)),
),
];
let file_location = "s3://bucket/path/to/file.parquet";
let ordered = reorder_struct_array(arry, &reorder, None, Some(file_location)).unwrap();
assert_eq!(ordered.column_names(), vec!["b", "_file"]);
let file_path_col = ordered.column(1);
let string_array = file_path_col
.as_any()
.downcast_ref::<StringArray>()
.expect("Expected StringArray");
assert_eq!(string_array.len(), 4);
assert!(string_array.iter().all(|v| v == Some(file_location)));
}
#[test]
fn test_reorder_struct_array_missing_file_path() {
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::file_path(
1,
Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)),
),
];
let result = reorder_struct_array(arry, &reorder, None, None);
assert_result_error_with_message(
result,
"File path column requested but file location not provided",
);
}
#[test]
fn test_row_index_builder_no_skipping() {
let row_groups = vec![
create_mock_row_group(5), create_mock_row_group(3), create_mock_row_group(4), ];
let builder = RowIndexBuilder::new(&row_groups);
let row_indexes: Vec<i64> = builder.build().unwrap().collect();
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
}
#[test]
fn test_row_index_builder_with_skipping() {
let row_groups = vec![
create_mock_row_group(5), create_mock_row_group(3), create_mock_row_group(4), create_mock_row_group(2), ];
let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[0, 2]);
let row_indexes: Vec<i64> = builder.build().unwrap().collect();
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 8, 9, 10, 11]);
}
#[test]
fn test_row_index_builder_single_row_group() {
let row_groups = vec![create_mock_row_group(7)];
let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[0]);
let row_indexes: Vec<i64> = builder.build().unwrap().collect();
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_row_index_builder_empty_selection() {
let row_groups = vec![create_mock_row_group(3), create_mock_row_group(2)];
let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[]);
let row_indexes: Vec<i64> = builder.build().unwrap().collect();
assert_eq!(row_indexes, Vec::<i64>::new());
}
#[test]
fn test_row_index_builder_out_of_order_selection() {
let row_groups = vec![
create_mock_row_group(2), create_mock_row_group(3), create_mock_row_group(1), ];
let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[2, 0]);
let row_indexes: Vec<i64> = builder.build().unwrap().collect();
assert_eq!(row_indexes, vec![5, 0, 1]);
}
#[test]
fn test_row_index_builder_out_of_bounds_row_group_ordinals() {
let row_groups = vec![create_mock_row_group(2)];
let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[1]);
let result = builder.build();
assert_result_error_with_message(result, "Row group ordinal 1 is out of bounds");
}
#[test]
fn test_row_index_builder_duplicate_row_group_ordinals() {
let row_groups = vec![create_mock_row_group(2), create_mock_row_group(3)];
let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[1, 1]);
let result = builder.build();
assert_result_error_with_message(result, "Found duplicate row group ordinal");
}
#[test]
fn nested_indices() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(
logical_name(3),
StructType::new_unchecked([
StructField::not_null(logical_name(4), DataType::INTEGER)
.with_metadata(column_mapping_metadata(4, mode)),
StructField::not_null(logical_name(5), DataType::STRING)
.with_metadata(column_mapping_metadata(5, mode)),
]),
)
.with_metadata(column_mapping_metadata(3, mode)),
StructField::not_null(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = nested_parquet_schema(mode);
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn nested_indices_reorder() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(
logical_name(3),
StructType::new_unchecked([
StructField::not_null(logical_name(5), DataType::STRING)
.with_metadata(column_mapping_metadata(5, mode)),
StructField::not_null(logical_name(4), DataType::INTEGER)
.with_metadata(column_mapping_metadata(4, mode)),
]),
)
.with_metadata(column_mapping_metadata(3, mode)),
StructField::not_null(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = nested_parquet_schema(mode);
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::nested(
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::identity(1),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn nested_indices_mask_inner() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(
logical_name(3),
StructType::new_unchecked([StructField::not_null(
logical_name(4),
DataType::INTEGER,
)
.with_metadata(column_mapping_metadata(4, mode))]),
)
.with_metadata(column_mapping_metadata(3, mode)),
StructField::not_null(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = nested_parquet_schema(mode);
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
})
}
#[test]
fn unmatched_struct_before_selected_leaf_ordering() {
let requested_schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::nullable("a", DataType::LONG),
StructField::nullable(
"stats",
StructType::new_unchecked([StructField::nullable("age", DataType::LONG)]),
),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
"stats",
ArrowDataType::Struct(
vec![ArrowField::new("id", ArrowDataType::Int64, true)].into(),
),
true,
),
ArrowField::new("a", ArrowDataType::Int64, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
assert_eq!(mask_indices, vec![1]);
let expected_stats_field = Arc::new(
requested_schema
.field("stats")
.unwrap()
.try_into_arrow()
.unwrap(),
);
assert_eq!(
reorder_indices,
vec![
ReorderIndex::identity(0),
ReorderIndex::missing(1, expected_stats_field),
]
);
}
#[test]
fn simple_list_mask() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(logical_name(2), ArrayType::new(DataType::INTEGER, false))
.with_metadata(column_mapping_metadata(2, mode)),
StructField::not_null(logical_name(3), DataType::INTEGER)
.with_metadata(column_mapping_metadata(3, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(1, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(1)),
ArrowField::new(
parquet_name(2, mode),
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Int32,
false,
))),
false,
)
.with_metadata(arrow_fid(2)),
ArrowField::new(parquet_name(3, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(3)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn list_skip_earlier_element() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([StructField::not_null(
logical_name(1),
ArrayType::new(DataType::INTEGER, false),
)
.with_metadata(column_mapping_metadata(1, mode))])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(0, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(0)),
ArrowField::new(
parquet_name(1, mode),
ArrowDataType::List(Arc::new(
ArrowField::new(parquet_name(2, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(2)),
)),
false,
)
.with_metadata(arrow_fid(1)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![1];
let expect_reorder = vec![ReorderIndex::identity(0)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn nested_indices_list() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(0), DataType::INTEGER)
.with_metadata(column_mapping_metadata(0, mode)),
StructField::not_null(
logical_name(1),
ArrayType::new(
StructType::new_unchecked([
StructField::not_null(logical_name(3), DataType::INTEGER)
.with_metadata(column_mapping_metadata(3, mode)),
StructField::not_null(logical_name(4), DataType::STRING)
.with_metadata(column_mapping_metadata(4, mode)),
])
.into(),
false,
),
)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(0, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(0)),
ArrowField::new(
parquet_name(1, mode),
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new(parquet_name(3, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(3)),
ArrowField::new(parquet_name(4, mode), ArrowDataType::Utf8, false)
.with_metadata(arrow_fid(4)),
]
.into(),
),
false,
))),
false,
)
.with_metadata(arrow_fid(1)),
ArrowField::new(parquet_name(2, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(2)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn nested_indices_unselected_list() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(logical_name(3), DataType::INTEGER)
.with_metadata(column_mapping_metadata(3, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(1, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(1)),
ArrowField::new(
parquet_name(2, mode),
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new(parquet_name(4, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(4)),
ArrowField::new(parquet_name(5, mode), ArrowDataType::Utf8, false)
.with_metadata(arrow_fid(5)),
]
.into(),
),
false,
))),
false,
)
.with_metadata(arrow_fid(2)),
ArrowField::new(parquet_name(3, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(3)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 3];
let expect_reorder = vec![ReorderIndex::identity(0), ReorderIndex::identity(1)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn nested_indices_list_mask_inner() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(
logical_name(2),
ArrayType::new(
StructType::new_unchecked([StructField::not_null(
logical_name(4),
DataType::INTEGER,
)
.with_metadata(column_mapping_metadata(4, mode))])
.into(),
false,
),
)
.with_metadata(column_mapping_metadata(2, mode)),
StructField::not_null(logical_name(3), DataType::INTEGER)
.with_metadata(column_mapping_metadata(3, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(1, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(1)),
ArrowField::new(
parquet_name(2, mode),
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new(parquet_name(4, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(4)),
ArrowField::new(parquet_name(5, mode), ArrowDataType::Utf8, false)
.with_metadata(arrow_fid(5)),
]
.into(),
),
false,
))),
false,
)
.with_metadata(arrow_fid(2)),
ArrowField::new(parquet_name(3, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(3)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn nested_indices_list_mask_inner_reorder() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(
logical_name(2),
ArrayType::new(
StructType::new_unchecked([
StructField::not_null(logical_name(6), DataType::STRING)
.with_metadata(column_mapping_metadata(6, mode)),
StructField::not_null(logical_name(5), DataType::INTEGER)
.with_metadata(column_mapping_metadata(5, mode)),
])
.into(),
false,
),
)
.with_metadata(column_mapping_metadata(2, mode)),
StructField::not_null(logical_name(3), DataType::INTEGER)
.with_metadata(column_mapping_metadata(3, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(parquet_name(1, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(1)),
ArrowField::new(
parquet_name(2, mode),
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new(parquet_name(4, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(4)),
ArrowField::new(parquet_name(5, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(5)),
ArrowField::new(parquet_name(6, mode), ArrowDataType::Utf8, false)
.with_metadata(arrow_fid(6)),
]
.into(),
),
false,
))),
false,
)
.with_metadata(arrow_fid(2)),
ArrowField::new(parquet_name(3, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(3)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 2, 3, 4];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn skipped_struct() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::not_null(logical_name(1), DataType::INTEGER)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::not_null(
logical_name(2),
StructType::new_unchecked([
StructField::not_null(logical_name(4), DataType::INTEGER)
.with_metadata(column_mapping_metadata(4, mode)),
StructField::not_null(logical_name(5), DataType::STRING)
.with_metadata(column_mapping_metadata(5, mode)),
]),
)
.with_metadata(column_mapping_metadata(2, mode)),
StructField::not_null(logical_name(3), DataType::INTEGER)
.with_metadata(column_mapping_metadata(3, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
"skipped",
ArrowDataType::Struct(
vec![
ArrowField::new(parquet_name(7, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(7)),
ArrowField::new(parquet_name(8, mode), ArrowDataType::Utf8, false)
.with_metadata(arrow_fid(8)),
]
.into(),
),
false,
)
.with_metadata(arrow_fid(6)),
ArrowField::new(parquet_name(3, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(3)),
ArrowField::new(
parquet_name(2, mode),
ArrowDataType::Struct(
vec![
ArrowField::new(parquet_name(4, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(4)),
ArrowField::new(parquet_name(5, mode), ArrowDataType::Utf8, false)
.with_metadata(arrow_fid(5)),
]
.into(),
),
false,
)
.with_metadata(arrow_fid(2)),
ArrowField::new(parquet_name(1, mode), ArrowDataType::Int32, false)
.with_metadata(arrow_fid(1)),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![2, 3, 4, 5];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(0),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn reorder_map_with_structs() {
let requested_schema = Arc::new(StructType::new_unchecked([
StructField::not_null("i", DataType::INTEGER),
StructField::not_null(
"map",
MapType::new(
StructType::new_unchecked([
StructField::not_null("k1", DataType::STRING),
StructField::not_null("k2", DataType::STRING),
]),
StructType::new_unchecked([
StructField::not_null("v2", DataType::STRING),
StructField::not_null("v1", DataType::STRING),
]),
false,
),
),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new_map(
"map",
"entries",
ArrowField::new(
"i",
ArrowDataType::Struct(
vec![
ArrowField::new("k1", ArrowDataType::Utf8, false),
ArrowField::new("k2", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
ArrowField::new(
"v",
ArrowDataType::Struct(
vec![
ArrowField::new("v1", ArrowDataType::Utf8, false),
ArrowField::new("v2", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
false,
false,
),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3, 4];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![
ReorderIndex::identity(0), ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
],
),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
fn make_struct_array() -> StructArray {
let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
StructArray::from(vec![
(
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
boolean.clone() as ArrowArrayRef,
),
(
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
int.clone() as ArrowArrayRef,
),
])
}
#[test]
fn simple_reorder_struct() {
let arry = make_struct_array();
let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)];
let ordered = reorder_struct_array(arry, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["c", "b"]);
}
#[test]
fn nested_reorder_struct() {
let arry1 = Arc::new(make_struct_array());
let arry2 = Arc::new(make_struct_array());
let fields: ArrowFields = vec![
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
]
.into();
let nested = StructArray::from(vec![
(
Arc::new(ArrowField::new(
"struct1",
ArrowDataType::Struct(fields.clone()),
false,
)),
arry1 as ArrowArrayRef,
),
(
Arc::new(ArrowField::new(
"struct2",
ArrowDataType::Struct(fields),
false,
)),
arry2 as ArrowArrayRef,
),
]);
let reorder = vec![
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::nested(
0,
vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::missing(
2,
Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true)),
),
],
),
];
let ordered = reorder_struct_array(nested, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]);
let ordered_s2 = ordered.column(0).as_struct();
assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]);
let ordered_s1 = ordered.column(1).as_struct();
assert_eq!(ordered_s1.column_names(), vec!["c", "b"]);
}
#[test]
fn reorder_list_of_struct() {
let boolean = Arc::new(BooleanArray::from(vec![
false, false, true, true, false, true,
]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31, 0, 3]));
let list_sa = StructArray::from(vec![
(
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
boolean.clone() as ArrowArrayRef,
),
(
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
int.clone() as ArrowArrayRef,
),
]);
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6]));
let list_field = ArrowField::new("item", list_sa.data_type().clone(), false);
let list = Arc::new(GenericListArray::new(
Arc::new(list_field),
offsets,
Arc::new(list_sa),
None,
));
let fields: ArrowFields = vec![
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
]
.into();
let list_dt = Arc::new(ArrowField::new(
"list",
ArrowDataType::new_list(ArrowDataType::Struct(fields), false),
false,
));
let struct_array = StructArray::from(vec![(list_dt, list as ArrowArrayRef)]);
let reorder = vec![ReorderIndex::nested(
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
)];
let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap();
let ordered_list_col = ordered.column(0).as_list::<i32>();
for i in 0..ordered_list_col.len() {
let array_item = ordered_list_col.value(i);
let struct_item = array_item.as_struct();
assert_eq!(struct_item.column_names(), vec!["c", "b"]);
}
}
fn build_arrow_map() -> MapArray {
let key_struct_builder = StructBuilder::from_fields(
ArrowFields::from(vec![
ArrowField::new("k1", ArrowDataType::Int32, false),
ArrowField::new("k2", ArrowDataType::Int32, false),
]),
1,
);
let value_struct_builder = StructBuilder::from_fields(
ArrowFields::from(vec![
ArrowField::new("v1", ArrowDataType::Int32, false),
ArrowField::new("v2", ArrowDataType::Int32, false),
]),
1,
);
let mut map_builder = MapBuilder::new(None, key_struct_builder, value_struct_builder);
let (key_builder, value_builder) = map_builder.entries();
let key_k1_builder = key_builder.field_builder::<Int32Builder>(0).unwrap();
key_k1_builder.append_value(1);
let key_k2_builder = key_builder.field_builder::<Int32Builder>(1).unwrap();
key_k2_builder.append_value(2);
key_builder.append(true);
let value_v1_builder = value_builder.field_builder::<Int32Builder>(0).unwrap();
value_v1_builder.append_value(1);
let value_v2_builder = value_builder.field_builder::<Int32Builder>(1).unwrap();
value_v2_builder.append_value(2);
value_builder.append(true);
map_builder.append(true).unwrap();
map_builder.finish()
}
#[test]
fn reorder_map_of_struct() {
let int_array = Arc::new(Int32Array::from(vec![42]));
let int_dt = Arc::new(ArrowField::new("i", int_array.data_type().clone(), false));
let map_array = Arc::new(build_arrow_map());
let map_dt = Arc::new(ArrowField::new("map", map_array.data_type().clone(), false));
let struct_array = StructArray::from(vec![
(int_dt, int_array as ArrowArrayRef),
(map_dt, map_array as ArrowArrayRef),
]);
let reorder = vec![
ReorderIndex::identity(1),
ReorderIndex::nested(
0,
vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
],
),
];
let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["map", "i"]);
if let ArrowDataType::Map(field, _) = ordered.column(0).data_type() {
if let ArrowDataType::Struct(fields) = field.data_type() {
fn assert_col_order(field: &ArrowField, expected: Vec<&str>) {
if let ArrowDataType::Struct(fields) = field.data_type() {
let names: Vec<&str> =
fields.iter().map(|field| field.name().as_str()).collect();
assert_eq!(names, expected);
} else {
panic!("Expected struct field");
}
}
assert_col_order(&fields[0], vec!["k1", "k2"]);
assert_col_order(&fields[1], vec!["v2", "v1"]);
} else {
panic!("Inner field should have been a struct");
}
} else {
panic!("Column 0 should have been a map");
}
}
#[test]
fn no_matches() {
column_mapping_cases().into_iter().for_each(|mode| {
let requested_schema = StructType::new_unchecked([
StructField::nullable(logical_name(1), DataType::STRING)
.with_metadata(column_mapping_metadata(1, mode)),
StructField::nullable(logical_name(2), DataType::INTEGER)
.with_metadata(column_mapping_metadata(2, mode)),
])
.make_physical(mode)
.unwrap()
.into();
let nots_field =
ArrowField::new("NOTs", ArrowDataType::Utf8, true).with_metadata(arrow_fid(3));
let noti2_field =
ArrowField::new("NOTi2", ArrowDataType::Int32, true).with_metadata(arrow_fid(4));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
nots_field.clone(),
noti2_field.clone(),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask: Vec<usize> = vec![];
let mut fields = requested_schema.fields();
let expected_field1: Arc<ArrowField> =
Arc::new(fields.next().unwrap().try_into_arrow().unwrap());
let expected_field2: Arc<ArrowField> =
Arc::new(fields.next().unwrap().try_into_arrow().unwrap());
let expect_reorder = vec![
ReorderIndex::missing(0, expected_field1),
ReorderIndex::missing(1, expected_field2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
});
}
#[test]
fn empty_requested_schema() {
let requested_schema = Arc::new(StructType::new_unchecked([]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask: Vec<usize> = vec![];
let expect_reorder = vec![];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn test_write_json() -> DeltaResult<()> {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"string",
ArrowDataType::Utf8,
true,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let filtered_data = FilteredEngineData::with_all_rows_selected(data);
let json = to_json_bytes(Box::new(std::iter::once(Ok(filtered_data))))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
);
Ok(())
}
#[test]
fn test_to_json_bytes_filters_data() -> DeltaResult<()> {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"value",
ArrowDataType::Utf8,
true,
)]));
let record_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![
"row0", "row1", "row2", "row3",
]))],
)?;
let create_engine_data =
|| -> Box<dyn EngineData> { Box::new(ArrowEngineData::new(record_batch.clone())) };
let all_selected =
FilteredEngineData::try_new(create_engine_data(), vec![true, true, true, true])?;
let json_all = to_json_bytes(Box::new(std::iter::once(Ok(all_selected))))?;
assert_eq!(
json_all,
"{\"value\":\"row0\"}\n{\"value\":\"row1\"}\n{\"value\":\"row2\"}\n{\"value\":\"row3\"}\n".as_bytes()
);
let partial_selected =
FilteredEngineData::try_new(create_engine_data(), vec![true, false, false, true])?;
let json_partial = to_json_bytes(Box::new(std::iter::once(Ok(partial_selected))))?;
assert_eq!(
json_partial,
"{\"value\":\"row0\"}\n{\"value\":\"row3\"}\n".as_bytes()
);
let middle_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, true, true, false])?;
let json_middle = to_json_bytes(Box::new(std::iter::once(Ok(middle_selected))))?;
assert_eq!(
json_middle,
"{\"value\":\"row1\"}\n{\"value\":\"row2\"}\n".as_bytes()
);
let none_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, false, false, false])?;
let json_none = to_json_bytes(Box::new(std::iter::once(Ok(none_selected))))?;
assert_eq!(json_none, "".as_bytes());
let one_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, true, false, false])?;
let json_one = to_json_bytes(Box::new(std::iter::once(Ok(one_selected))))?;
assert_eq!(json_one, "{\"value\":\"row1\"}\n".as_bytes());
let one_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, false, false])?;
let json_one = to_json_bytes(Box::new(std::iter::once(Ok(one_selected))))?;
assert_eq!(json_one, "{\"value\":\"row3\"}\n".as_bytes());
Ok(())
}
#[test]
fn test_arrow_broken_nested_null_masks() {
use crate::arrow::datatypes::{DataType, Field, Schema};
use crate::engine::arrow_utils::fix_nested_null_masks;
use crate::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let schema = Arc::new(Schema::new(vec![Field::new(
"outer",
DataType::Struct(ArrowFields::from(vec![
Field::new(
"inner_nullable",
DataType::Struct(ArrowFields::from(vec![
Field::new("leaf_non_null", DataType::Int32, false),
Field::new("leaf_nullable", DataType::Int32, true),
])),
true,
),
Field::new(
"inner_non_null",
DataType::Struct(ArrowFields::from(vec![
Field::new("leaf_non_null", DataType::Int32, false),
Field::new("leaf_nullable", DataType::Int32, true),
])),
false,
),
])),
true,
)]));
let json_string = r#"
{ }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 1 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 2, "leaf_nullable" : 3 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 4 }, "inner_nullable" : { "leaf_non_null" : 5 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 6 }, "inner_nullable" : { "leaf_non_null" : 7, "leaf_nullable": 8 } } }
"#;
let batch1 = crate::arrow::json::ReaderBuilder::new(schema.clone())
.build(json_string.as_bytes())
.unwrap()
.next()
.unwrap()
.unwrap();
macro_rules! assert_nulls {
( $column: expr, $nulls: expr ) => {
assert_eq!($column.nulls().unwrap(), &NullBuffer::from(&$nulls[..]));
};
}
let outer_1 = batch1.column(0).as_struct();
assert_nulls!(outer_1, [false, true, true, true, true]);
let inner_nullable_1 = outer_1.column(0).as_struct();
assert_nulls!(inner_nullable_1, [false, false, false, true, true]);
let nullable_leaf_non_null_1 = inner_nullable_1.column(0);
assert_nulls!(nullable_leaf_non_null_1, [false, false, false, true, true]);
let nullable_leaf_nullable_1 = inner_nullable_1.column(1);
assert_nulls!(nullable_leaf_nullable_1, [false, false, false, false, true]);
let inner_non_null_1 = outer_1.column(1).as_struct();
assert_nulls!(inner_non_null_1, [false, true, true, true, true]);
let non_null_leaf_non_null_1 = inner_non_null_1.column(0);
assert_nulls!(non_null_leaf_non_null_1, [false, true, true, true, true]);
let non_null_leaf_nullable_1 = inner_non_null_1.column(1);
assert_nulls!(non_null_leaf_nullable_1, [false, false, true, false, false]);
let mut buffer = vec![];
let mut writer =
crate::parquet::arrow::ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
writer.write(&batch1).unwrap();
writer.close().unwrap(); let batch2 = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(buffer))
.unwrap()
.build()
.unwrap()
.next()
.unwrap()
.unwrap();
let batch2 = RecordBatch::from(fix_nested_null_masks(batch2.into()));
let outer_2 = batch2.column(0).as_struct();
assert_eq!(outer_2, outer_1);
let inner_nullable_2 = outer_2.column(0).as_struct();
assert_eq!(inner_nullable_2, inner_nullable_1);
let nullable_leaf_non_null_2 = inner_nullable_2.column(0);
assert_eq!(nullable_leaf_non_null_2, nullable_leaf_non_null_1);
let nullable_leaf_nullable_2 = inner_nullable_2.column(1);
assert_eq!(nullable_leaf_nullable_2, nullable_leaf_nullable_1);
let inner_non_null_2 = outer_2.column(1).as_struct();
assert_eq!(inner_non_null_2, inner_non_null_1);
let non_null_leaf_non_null_2 = inner_non_null_2.column(0);
assert_eq!(non_null_leaf_non_null_2, non_null_leaf_non_null_1);
let non_null_leaf_nullable_2 = inner_non_null_2.column(1);
assert_eq!(non_null_leaf_nullable_2, non_null_leaf_nullable_1);
}
type CoerceTestCase = (Arc<ArrowField>, Arc<ArrowField>, Arc<dyn ArrowArray>);
fn make_coerce_pair(
make_field: impl Fn(bool) -> Arc<ArrowField>,
make_col: impl Fn(bool) -> Arc<dyn ArrowArray>,
) -> (CoerceTestCase, CoerceTestCase) {
(
(make_field(false), make_field(true), make_col(false)),
(make_field(true), make_field(false), make_col(true)),
)
}
fn make_map_entries_field(value_field: ArrowField) -> Arc<ArrowField> {
Arc::new(ArrowField::new(
"entries",
ArrowDataType::Struct(ArrowFields::from(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
value_field,
])),
false,
))
}
fn make_map_array(
entries_field: Arc<ArrowField>,
values: Arc<dyn ArrowArray>,
) -> Arc<dyn ArrowArray> {
let entry_fields = match entries_field.data_type() {
ArrowDataType::Struct(f) => f.clone(),
_ => unreachable!(),
};
let keys: Arc<dyn ArrowArray> = Arc::new(StringArray::from(vec!["a", "b"]));
let entries = StructArray::try_new(entry_fields, vec![keys, values], None).unwrap();
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 1, 2]));
Arc::new(MapArray::try_new(entries_field, offsets, entries, None, false).unwrap())
}
fn create_data_int() -> (CoerceTestCase, CoerceTestCase) {
let col: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2]));
make_coerce_pair(
|leaf_nullable| Arc::new(ArrowField::new("col", ArrowDataType::Int32, leaf_nullable)),
|_leaf_nullable| col.clone(),
)
}
fn create_data_string() -> (CoerceTestCase, CoerceTestCase) {
let col: Arc<dyn ArrowArray> = Arc::new(StringArray::from(vec!["a", "b"]));
make_coerce_pair(
|leaf_nullable| Arc::new(ArrowField::new("col", ArrowDataType::Utf8, leaf_nullable)),
|_leaf_nullable| col.clone(),
)
}
fn create_data_struct() -> (CoerceTestCase, CoerceTestCase) {
let inner_col: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2]));
make_coerce_pair(
|leaf_nullable| {
Arc::new(ArrowField::new(
"c",
ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
"val",
ArrowDataType::Int32,
leaf_nullable,
)])),
false,
))
},
|leaf_nullable| {
let inner = ArrowField::new("val", ArrowDataType::Int32, leaf_nullable);
Arc::new(
StructArray::try_new(
ArrowFields::from(vec![inner]),
vec![inner_col.clone()],
None,
)
.unwrap(),
)
},
)
}
fn create_data_list() -> (CoerceTestCase, CoerceTestCase) {
let values: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2, 3]));
make_coerce_pair(
|leaf_nullable| {
let elem = Arc::new(ArrowField::new("item", ArrowDataType::Int32, leaf_nullable));
Arc::new(ArrowField::new("col", ArrowDataType::List(elem), false))
},
|leaf_nullable| {
let elem = Arc::new(ArrowField::new("item", ArrowDataType::Int32, leaf_nullable));
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3]));
Arc::new(
GenericListArray::<i32>::try_new(elem, offsets, values.clone(), None).unwrap(),
)
},
)
}
fn create_data_map() -> (CoerceTestCase, CoerceTestCase) {
make_coerce_pair(
|leaf_nullable| {
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::Int32,
leaf_nullable,
));
Arc::new(ArrowField::new(
"c",
ArrowDataType::Map(entries_field, false),
false,
))
},
|leaf_nullable| {
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::Int32,
leaf_nullable,
));
make_map_array(entries_field, Arc::new(Int32Array::from(vec![1, 2])))
},
)
}
fn create_data_struct_in_list() -> (CoerceTestCase, CoerceTestCase) {
make_coerce_pair(
|leaf_nullable| {
let leaf = ArrowField::new("val", ArrowDataType::Int32, leaf_nullable);
let elem = Arc::new(ArrowField::new(
"item",
ArrowDataType::Struct(ArrowFields::from(vec![leaf])),
false,
));
Arc::new(ArrowField::new("col", ArrowDataType::List(elem), false))
},
|leaf_nullable| {
let leaf = ArrowField::new("val", ArrowDataType::Int32, leaf_nullable);
let inner_col: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2, 3]));
let elem_field = Arc::new(ArrowField::new(
"item",
ArrowDataType::Struct(ArrowFields::from(vec![leaf.clone()])),
false,
));
let structs =
StructArray::try_new(ArrowFields::from(vec![leaf]), vec![inner_col], None)
.unwrap();
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3]));
Arc::new(
GenericListArray::<i32>::try_new(elem_field, offsets, Arc::new(structs), None)
.unwrap(),
)
},
)
}
fn create_data_list_in_struct() -> (CoerceTestCase, CoerceTestCase) {
make_coerce_pair(
|leaf_nullable| {
let elem = Arc::new(ArrowField::new("item", ArrowDataType::Int32, leaf_nullable));
let list_field = ArrowField::new("vals", ArrowDataType::List(elem), false);
Arc::new(ArrowField::new(
"c",
ArrowDataType::Struct(ArrowFields::from(vec![list_field])),
false,
))
},
|leaf_nullable| {
let elem = Arc::new(ArrowField::new("item", ArrowDataType::Int32, leaf_nullable));
let values: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2, 3]));
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3]));
let list_col: Arc<dyn ArrowArray> = Arc::new(
GenericListArray::<i32>::try_new(elem, offsets, values, None).unwrap(),
);
let list_field = ArrowField::new("vals", list_col.data_type().clone(), false);
Arc::new(
StructArray::try_new(ArrowFields::from(vec![list_field]), vec![list_col], None)
.unwrap(),
)
},
)
}
fn create_data_list_in_map() -> (CoerceTestCase, CoerceTestCase) {
make_coerce_pair(
|leaf_nullable| {
let elem = Arc::new(ArrowField::new("item", ArrowDataType::Int32, leaf_nullable));
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::List(elem),
true,
));
Arc::new(ArrowField::new(
"c",
ArrowDataType::Map(entries_field, false),
false,
))
},
|leaf_nullable| {
let elem = Arc::new(ArrowField::new("item", ArrowDataType::Int32, leaf_nullable));
let list_values: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2]));
let list_offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 1, 2]));
let list_col: Arc<dyn ArrowArray> = Arc::new(
GenericListArray::<i32>::try_new(elem, list_offsets, list_values, None)
.unwrap(),
);
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::List(Arc::new(ArrowField::new(
"item",
ArrowDataType::Int32,
leaf_nullable,
))),
true,
));
make_map_array(entries_field, list_col)
},
)
}
fn create_data_struct_in_map() -> (CoerceTestCase, CoerceTestCase) {
make_coerce_pair(
|leaf_nullable| {
let leaf = ArrowField::new("val", ArrowDataType::Int32, leaf_nullable);
let value_field = ArrowField::new(
"value",
ArrowDataType::Struct(ArrowFields::from(vec![leaf])),
true,
);
let entries_field = make_map_entries_field(value_field);
Arc::new(ArrowField::new(
"c",
ArrowDataType::Map(entries_field, false),
false,
))
},
|leaf_nullable| {
let leaf = ArrowField::new("val", ArrowDataType::Int32, leaf_nullable);
let inner: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2]));
let struct_col: Arc<dyn ArrowArray> = Arc::new(
StructArray::try_new(ArrowFields::from(vec![leaf.clone()]), vec![inner], None)
.unwrap(),
);
let value_field = ArrowField::new(
"value",
ArrowDataType::Struct(ArrowFields::from(vec![leaf])),
true,
);
let entries_field = make_map_entries_field(value_field);
make_map_array(entries_field, struct_col)
},
)
}
fn create_data_map_in_struct() -> (CoerceTestCase, CoerceTestCase) {
make_coerce_pair(
|leaf_nullable| {
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::Int32,
leaf_nullable,
));
let map_field =
ArrowField::new("map_child", ArrowDataType::Map(entries_field, false), false);
Arc::new(ArrowField::new(
"c",
ArrowDataType::Struct(ArrowFields::from(vec![map_field])),
false,
))
},
|leaf_nullable| {
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::Int32,
leaf_nullable,
));
let map_col = make_map_array(entries_field, Arc::new(Int32Array::from(vec![1, 2])));
let map_child_field =
ArrowField::new("map_child", map_col.data_type().clone(), false);
Arc::new(
StructArray::try_new(
ArrowFields::from(vec![map_child_field]),
vec![map_col],
None,
)
.unwrap(),
)
},
)
}
fn create_data_map_in_list() -> (CoerceTestCase, CoerceTestCase) {
make_coerce_pair(
|leaf_nullable| {
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::Int32,
leaf_nullable,
));
let map_elem = Arc::new(ArrowField::new(
"item",
ArrowDataType::Map(entries_field, false),
false,
));
Arc::new(ArrowField::new("col", ArrowDataType::List(map_elem), false))
},
|leaf_nullable| {
let entries_field = make_map_entries_field(ArrowField::new(
"value",
ArrowDataType::Int32,
leaf_nullable,
));
let map_col = make_map_array(entries_field, Arc::new(Int32Array::from(vec![1, 2])));
let map_elem =
Arc::new(ArrowField::new("item", map_col.data_type().clone(), false));
let list_offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 1, 2]));
Arc::new(
GenericListArray::<i32>::try_new(map_elem, list_offsets, map_col, None)
.unwrap(),
)
},
)
}
use rstest::rstest;
fn allocate_name_to_field(idx: usize, (src, tgt, col): CoerceTestCase) -> CoerceTestCase {
let name = format!("c{idx}");
let src = Arc::new(ArrowField::new(
&name,
src.data_type().clone(),
src.is_nullable(),
));
let tgt = Arc::new(ArrowField::new(
&name,
tgt.data_type().clone(),
tgt.is_nullable(),
));
(src, tgt, col)
}
#[rstest]
#[case::int_and_struct_in_list(vec![
create_data_int(), create_data_struct_in_list(),
], true)]
#[case::string_and_list_in_struct(vec![
create_data_string(), create_data_list_in_struct(),
], false)]
#[case::all_simple_types(vec![
create_data_int(), create_data_string(), create_data_struct(),
create_data_list(), create_data_map(),
], true)]
#[case::all_nested_types(vec![
create_data_struct_in_list(), create_data_list_in_struct(),
create_data_list_in_map(), create_data_struct_in_map(),
create_data_map_in_struct(), create_data_map_in_list(),
], false)]
#[case::mixed_to_nullable(vec![
create_data_int(), create_data_struct(), create_data_map_in_list(),
create_data_struct_in_map(),
], true)]
#[case::mixed_to_non_null(vec![
create_data_list(), create_data_map(), create_data_list_in_map(),
create_data_map_in_struct(),
], false)]
fn test_coerce_batch_nullability(
#[case] data: Vec<(CoerceTestCase, CoerceTestCase)>,
#[case] to_nullable: bool,
) {
let (src_fields, tgt_fields, cols): (Vec<_>, Vec<_>, Vec<_>) = data
.into_iter()
.enumerate()
.map(|(i, (to_nullable_case, to_non_null_case))| {
let case = if to_nullable {
to_nullable_case
} else {
to_non_null_case
};
allocate_name_to_field(i, case)
})
.multiunzip();
let src_schema = Arc::new(ArrowSchema::new(src_fields));
let target_schema = Arc::new(ArrowSchema::new(tgt_fields));
assert_ne!(src_schema, target_schema);
let batch = RecordBatch::try_new(src_schema, cols).unwrap();
let result = coerce_batch_nullability(batch, &target_schema, None).unwrap();
assert_eq!(*result.schema(), *target_schema);
}
#[test]
fn test_coerce_batch_nullability_schema_already_matches() {
let field = ArrowField::new("a", ArrowDataType::Int32, false);
let col: Arc<dyn ArrowArray> = Arc::new(Int32Array::from(vec![1, 2]));
let schema = Arc::new(ArrowSchema::new(vec![field]));
let batch = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
let result = coerce_batch_nullability(batch.clone(), &schema, None).unwrap();
assert_eq!(result, batch);
}
#[test]
fn test_coerce_batch_nullability_type_mismatch_rejected_without_validator() {
let ((int_src_field, _, int_col), _) = create_data_int();
let (_, (_, string_tgt_field, _)) = create_data_string();
let src_schema = Arc::new(ArrowSchema::new(vec![int_src_field.as_ref().clone()]));
let target_schema = Arc::new(ArrowSchema::new(vec![string_tgt_field.as_ref().clone()]));
let batch = RecordBatch::try_new(src_schema, vec![int_col]).unwrap();
let result = coerce_batch_nullability(batch, &target_schema, None);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("data type mismatch"),
"unexpected error: {err}"
);
}
#[test]
fn test_coerce_batch_nullability_type_mismatch_allowed_with_validator() {
let ((int_src_field, _, int_col), _) = create_data_int();
let ((_, string_tgt_field, _), _) = create_data_string();
let src_schema = Arc::new(ArrowSchema::new(vec![int_src_field.as_ref().clone()]));
let target_schema = Arc::new(ArrowSchema::new(vec![string_tgt_field.as_ref().clone()]));
let batch = RecordBatch::try_new(src_schema, vec![int_col]).unwrap();
let allow_all = |_: &ArrowFieldRef, _: &ArrowFieldRef| Ok(());
let result = coerce_batch_nullability(batch, &target_schema, Some(&allow_all)).unwrap();
assert_eq!(result.schema().field(0).data_type(), &ArrowDataType::Int32);
assert!(result.schema().field(0).is_nullable());
}
#[test]
fn test_coerce_batch_nullability_preserves_field_metadata() {
use std::collections::HashMap;
let meta = |key: &str| HashMap::from([(key.to_string(), "val".to_string())]);
let offsets_1 = || OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 1]));
fn make_all_nullable(field: &ArrowField) -> ArrowField {
let dt = match field.data_type() {
ArrowDataType::Struct(children) => ArrowDataType::Struct(
children
.iter()
.map(|f| Arc::new(make_all_nullable(f)))
.collect(),
),
ArrowDataType::List(elem) => ArrowDataType::List(Arc::new(make_all_nullable(elem))),
ArrowDataType::Map(entries, ordered) => {
let inner = make_all_nullable(entries).with_nullable(false);
ArrowDataType::Map(Arc::new(inner), *ordered)
}
other => other.clone(),
};
field.clone().with_data_type(dt).with_nullable(true)
}
let src_item = Arc::new(
ArrowField::new("item", ArrowDataType::Int32, false).with_metadata(meta("item")),
);
let src_entries = Arc::new(ArrowField::new(
"entries",
ArrowDataType::Struct(ArrowFields::from(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Int32, false).with_metadata(meta("value")),
])),
false,
));
let src_list = ArrowField::new("lst", ArrowDataType::List(src_item.clone()), false)
.with_metadata(meta("list"));
let src_map = ArrowField::new("mp", ArrowDataType::Map(src_entries.clone(), false), false)
.with_metadata(meta("map"));
let src_struct = ArrowField::new(
"s",
ArrowDataType::Struct(ArrowFields::from(vec![src_list.clone(), src_map.clone()])),
false,
)
.with_metadata(meta("struct"));
let tgt_struct = make_all_nullable(&src_struct);
let entry_fields = match src_entries.data_type() {
ArrowDataType::Struct(f) => f.clone(),
_ => unreachable!(),
};
let list_col: Arc<dyn ArrowArray> = Arc::new(
GenericListArray::<i32>::try_new(
src_item,
offsets_1(),
Arc::new(Int32Array::from(vec![1])),
None,
)
.unwrap(),
);
let entries = StructArray::try_new(
entry_fields,
vec![
Arc::new(StringArray::from(vec!["a"])) as _,
Arc::new(Int32Array::from(vec![10])) as _,
],
None,
)
.unwrap();
let map_col: Arc<dyn ArrowArray> =
Arc::new(MapArray::try_new(src_entries, offsets_1(), entries, None, false).unwrap());
let struct_col: Arc<dyn ArrowArray> = Arc::new(
StructArray::try_new(
ArrowFields::from(vec![src_list, src_map]),
vec![list_col, map_col],
None,
)
.unwrap(),
);
let src_schema = Arc::new(ArrowSchema::new(vec![src_struct]));
let target_schema = Arc::new(ArrowSchema::new(vec![tgt_struct]));
let batch = RecordBatch::try_new(src_schema, vec![struct_col]).unwrap();
let result = coerce_batch_nullability(batch, &target_schema, None).unwrap();
let schema = result.schema();
let s = schema.field(0);
assert_eq!(s.metadata(), &meta("struct"));
assert!(s.is_nullable());
let fields = match s.data_type() {
ArrowDataType::Struct(f) => f,
other => panic!("expected Struct, got {other:?}"),
};
assert_eq!(fields[0].metadata(), &meta("list"));
assert!(fields[0].is_nullable());
let list_item = match fields[0].data_type() {
ArrowDataType::List(e) => e,
other => panic!("expected List, got {other:?}"),
};
assert_eq!(list_item.metadata(), &meta("item"));
assert!(list_item.is_nullable());
assert_eq!(fields[1].metadata(), &meta("map"));
assert!(fields[1].is_nullable());
let map_val = match fields[1].data_type() {
ArrowDataType::Map(e, _) => match e.data_type() {
ArrowDataType::Struct(f) => &f[1],
other => panic!("expected Struct entries, got {other:?}"),
},
other => panic!("expected Map, got {other:?}"),
};
assert_eq!(map_val.metadata(), &meta("value"));
assert!(map_val.is_nullable());
}
const FILE_PATH: &str = "s3://bucket/test.json";
struct JsonInsertCase {
schema: StructType,
expected_json_names: &'static [&'static str],
expected_output_names: &'static [&'static str],
file_path_col: Option<usize>,
}
#[rstest]
#[case::no_file_path(JsonInsertCase {
schema: StructType::new_unchecked([
StructField::not_null("a", DataType::INTEGER),
StructField::nullable("b", DataType::INTEGER),
]),
expected_json_names: &["a", "b"],
expected_output_names: &["a", "b"],
file_path_col: None,
})]
#[case::file_path_at_start(JsonInsertCase {
schema: StructType::new_unchecked([
StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
StructField::not_null("a", DataType::INTEGER),
StructField::nullable("b", DataType::INTEGER),
]),
expected_json_names: &["a", "b"],
expected_output_names: &["_file", "a", "b"],
file_path_col: Some(0),
})]
#[case::file_path_in_middle(JsonInsertCase {
schema: StructType::new_unchecked([
StructField::not_null("a", DataType::INTEGER),
StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
StructField::nullable("b", DataType::INTEGER),
]),
expected_json_names: &["a", "b"],
expected_output_names: &["a", "_file", "b"],
file_path_col: Some(1),
})]
#[case::file_path_at_end(JsonInsertCase {
schema: StructType::new_unchecked([
StructField::not_null("a", DataType::INTEGER),
StructField::nullable("b", DataType::INTEGER),
StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
]),
expected_json_names: &["a", "b"],
expected_output_names: &["a", "b", "_file"],
file_path_col: Some(2),
})]
fn test_json_file_path_insertion(#[case] case: JsonInsertCase) {
let json_schema = json_arrow_schema(&case.schema).unwrap();
let json_names: Vec<_> = json_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
assert_eq!(json_names, case.expected_json_names);
let arrow_schema = Arc::new(json_schema);
let cols: Vec<ArrowArrayRef> = (0..arrow_schema.fields().len())
.map(|_| Arc::new(Int32Array::from(vec![1i32, 2, 3])) as _)
.collect();
let batch = RecordBatch::try_new(arrow_schema, cols).unwrap();
let indices = build_json_reorder_indices(&case.schema).unwrap();
let result = RecordBatch::from(
reorder_struct_array(batch.into(), &indices, None, Some(FILE_PATH)).unwrap(),
);
let schema = result.schema();
let output_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(output_names, case.expected_output_names);
assert_eq!(result.num_rows(), 3);
if let Some(idx) = case.file_path_col {
let arr = result
.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.expect("_file column should be a StringArray");
assert!(arr.iter().all(|v| v == Some(FILE_PATH)));
} else {
assert!(
result.schema().fields().iter().all(|f| f.name() != "_file"),
"_file should not appear when not declared in the schema"
);
}
}
#[test]
fn test_build_json_reorder_indices_unsupported_metadata_column_errors() {
let schema = StructType::new_unchecked([
StructField::not_null("a", DataType::INTEGER),
StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
]);
let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
ArrowDataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
arrow_schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let indices = build_json_reorder_indices(&schema).unwrap();
assert!(reorder_struct_array(batch.into(), &indices, None, None).is_err());
}
#[test]
fn ensure_we_encode_maps_with_null_values() {
let schema = ArrowSchema::new(vec![
ArrowField::new("str_col", ArrowDataType::Utf8, false),
ArrowField::new(
"map_col",
ArrowDataType::Map(
Arc::new(ArrowField::new(
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("keys", ArrowDataType::Utf8, false),
ArrowField::new("values", ArrowDataType::Utf8, true),
]
.into(),
),
false,
)),
false, ),
false,
),
]);
let s_array = StringArray::from(vec!["foo"]);
let string_builder = StringBuilder::new();
let string_builder2 = StringBuilder::new();
let mut map_builder = MapBuilder::new(None, string_builder, string_builder2);
map_builder.keys().append_value("bar");
map_builder.values().append_null();
map_builder.append(true).unwrap();
let map_array: MapArray = map_builder.finish();
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(s_array), Arc::new(map_array)],
)
.unwrap();
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(batch));
let filtered_data = FilteredEngineData::with_all_rows_selected(data);
let json = to_json_bytes(Box::new(std::iter::once(Ok(filtered_data)))).unwrap();
assert_eq!(
json,
"{\"str_col\":\"foo\",\"map_col\":{\"bar\":null}}\n".as_bytes()
);
}
#[rstest]
fn struct_with_all_nullable_children_unmatched_is_missing(
#[values(true, false)] struct_nullable: bool,
) {
let info_field = if struct_nullable {
StructField::nullable(
"info",
StructType::new_unchecked([StructField::nullable("z", DataType::LONG)]),
)
} else {
StructField::not_null(
"info",
StructType::new_unchecked([StructField::nullable("z", DataType::LONG)]),
)
};
let requested_schema = Arc::new(StructType::new_unchecked([
StructField::not_null("a", DataType::LONG),
info_field,
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", ArrowDataType::Int64, true),
ArrowField::new(
"info",
ArrowDataType::Struct(
vec![
ArrowField::new("x", ArrowDataType::Int64, true),
ArrowField::new("y", ArrowDataType::Utf8, true),
]
.into(),
),
!struct_nullable,
),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
assert_eq!(mask_indices, vec![0]);
let expected_info_field = Arc::new(
requested_schema
.field("info")
.unwrap()
.try_into_arrow()
.unwrap(),
);
assert_eq!(
reorder_indices,
vec![
ReorderIndex::identity(0),
ReorderIndex::missing(1, expected_info_field),
]
);
}
#[test]
fn reorder_non_nullable_missing_struct_produces_non_null_struct() {
let a_array: Arc<dyn ArrowArray> = Arc::new(Int64Array::from(vec![1, 2, 3]));
let input = StructArray::from(vec![(
Arc::new(ArrowField::new("a", ArrowDataType::Int64, false)),
a_array,
)]);
let missing_field = Arc::new(ArrowField::new(
"info",
ArrowDataType::Struct(vec![ArrowField::new("z", ArrowDataType::Int64, true)].into()),
false,
));
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::missing(1, missing_field),
];
let ordered = reorder_struct_array(input, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["a", "info"]);
let info = ordered.column(1).as_struct();
assert_eq!(info.null_count(), 0);
assert_eq!(info.column(0).null_count(), 3);
}
#[test]
fn reorder_nested_non_nullable_missing_struct_recurses() {
let a_array: Arc<dyn ArrowArray> = Arc::new(Int64Array::from(vec![1, 2]));
let input = StructArray::from(vec![(
Arc::new(ArrowField::new("a", ArrowDataType::Int64, false)),
a_array,
)]);
let inner_struct =
ArrowDataType::Struct(vec![ArrowField::new("leaf", ArrowDataType::Int64, true)].into());
let missing_field = Arc::new(ArrowField::new(
"outer",
ArrowDataType::Struct(vec![ArrowField::new("inner", inner_struct, false)].into()),
false,
));
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::missing(1, missing_field),
];
let ordered = reorder_struct_array(input, &reorder, None, None).unwrap();
let outer = ordered.column(1).as_struct();
assert_eq!(outer.null_count(), 0);
let inner = outer.column(0).as_struct();
assert_eq!(inner.null_count(), 0);
assert_eq!(inner.column(0).null_count(), 2);
}
#[test]
fn empty_struct_is_matched() {
let requested_schema = Arc::new(StructType::new_unchecked([
StructField::not_null("a", DataType::LONG),
StructField::not_null("empty", StructType::new_unchecked([])),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", ArrowDataType::Int64, true),
ArrowField::new("empty", ArrowDataType::Struct(ArrowFields::empty()), false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
assert_eq!(mask_indices, vec![0]);
let expected_empty_field = Arc::new(
requested_schema
.field("empty")
.unwrap()
.try_into_arrow()
.unwrap(),
);
assert_eq!(
reorder_indices,
vec![
ReorderIndex::identity(0),
ReorderIndex::missing(1, expected_empty_field),
]
);
}
}