use std::{collections::HashSet, sync::Arc};
use crate::{
schema::{DataType, PrimitiveType, Schema, SchemaRef, StructField, StructType},
utils::require,
DeltaResult, Error,
};
use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
StructArray,
};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
SchemaRef as ArrowSchemaRef,
};
use itertools::Itertools;
use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor};
use tracing::debug;
macro_rules! prim_array_cmp {
( $left_arr: ident, $right_arr: ident, $(($data_ty: pat, $prim_ty: ty)),+ ) => {
return match $left_arr.data_type() {
$(
$data_ty => {
let prim_array = $left_arr.as_primitive_opt::<$prim_ty>()
.ok_or(Error::invalid_expression(
format!("Cannot cast to primitive array: {}", $left_arr.data_type()))
)?;
let list_array = $right_arr.as_list_opt::<i32>()
.ok_or(Error::invalid_expression(
format!("Cannot cast to list array: {}", $right_arr.data_type()))
)?;
arrow_ord::comparison::in_list(prim_array, list_array).map(wrap_comparison_result)
}
)+
_ => Err(ArrowError::CastError(
format!("Bad Comparison between: {:?} and {:?}",
$left_arr.data_type(),
$right_arr.data_type())
)
)
}.map_err(Error::generic_err);
};
}
pub(crate) use prim_array_cmp;
fn make_arrow_error(s: String) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s))
}
pub(crate) enum DataTypeCompat {
Identical,
NeedsCast(ArrowDataType),
Nested,
}
fn check_cast_compat(
target_type: ArrowDataType,
source_type: &ArrowDataType,
) -> DeltaResult<DataTypeCompat> {
match (source_type, &target_type) {
(source_type, target_type) if source_type == target_type => Ok(DataTypeCompat::Identical),
(&ArrowDataType::Timestamp(_, _), &ArrowDataType::Timestamp(_, _)) => {
Ok(DataTypeCompat::NeedsCast(target_type))
}
_ => Err(make_arrow_error(format!(
"Incorrect datatype. Expected {}, got {}",
target_type, source_type
))),
}
}
pub(crate) fn ensure_data_types(
kernel_type: &DataType,
arrow_type: &ArrowDataType,
) -> DeltaResult<DataTypeCompat> {
match (kernel_type, arrow_type) {
(DataType::Primitive(_), _) if arrow_type.is_primitive() => {
check_cast_compat(kernel_type.try_into()?, arrow_type)
}
(DataType::Primitive(PrimitiveType::Boolean), ArrowDataType::Boolean)
| (DataType::Primitive(PrimitiveType::String), ArrowDataType::Utf8)
| (DataType::Primitive(PrimitiveType::Binary), ArrowDataType::Binary) => {
Ok(DataTypeCompat::Identical)
}
(
DataType::Primitive(PrimitiveType::Decimal(kernel_prec, kernel_scale)),
ArrowDataType::Decimal128(arrow_prec, arrow_scale),
) if arrow_prec == kernel_prec && *arrow_scale == *kernel_scale as i8 => {
Ok(DataTypeCompat::Identical)
}
(DataType::Array(inner_type), ArrowDataType::List(arrow_list_type)) => {
let kernel_array_type = &inner_type.element_type;
let arrow_list_type = arrow_list_type.data_type();
ensure_data_types(kernel_array_type, arrow_list_type)
}
(DataType::Map(kernel_map_type), ArrowDataType::Map(arrow_map_type, _)) => {
if let ArrowDataType::Struct(fields) = arrow_map_type.data_type() {
let mut fields = fields.iter();
if let Some(key_type) = fields.next() {
ensure_data_types(&kernel_map_type.key_type, key_type.data_type())?;
} else {
return Err(make_arrow_error(
"Arrow map struct didn't have a key type".to_string(),
));
}
if let Some(value_type) = fields.next() {
ensure_data_types(&kernel_map_type.value_type, value_type.data_type())?;
} else {
return Err(make_arrow_error(
"Arrow map struct didn't have a value type".to_string(),
));
}
if fields.next().is_some() {
return Err(Error::generic("map fields had more than 2 members"));
}
Ok(DataTypeCompat::Nested)
} else {
Err(make_arrow_error(
"Arrow map type wasn't a struct.".to_string(),
))
}
}
(DataType::Struct(kernel_fields), ArrowDataType::Struct(arrow_fields)) => {
let mapped_fields = arrow_fields
.iter()
.filter_map(|f| kernel_fields.fields.get(f.name()));
let mut found_fields = 0;
for (kernel_field, arrow_field) in mapped_fields.zip(arrow_fields) {
ensure_data_types(&kernel_field.data_type, arrow_field.data_type())?;
found_fields += 1;
}
require!(kernel_fields.fields.len() == found_fields, {
let arrow_field_map: HashSet<&String> =
HashSet::from_iter(arrow_fields.iter().map(|f| f.name()));
let missing_field_names = kernel_fields
.fields
.keys()
.filter(|kernel_field| !arrow_field_map.contains(kernel_field))
.take(5)
.join(", ");
make_arrow_error(format!(
"Missing Struct fields {} (Up to five missing fields shown)",
missing_field_names
))
});
Ok(DataTypeCompat::Nested)
}
_ => Err(make_arrow_error(format!(
"Incorrect datatype. Expected {}, got {}",
kernel_type, arrow_type
))),
}
}
#[derive(Debug, PartialEq)]
pub(crate) struct ReorderIndex {
pub(crate) index: usize,
transform: ReorderIndexTransform,
}
#[derive(Debug, PartialEq)]
pub(crate) enum ReorderIndexTransform {
Cast(ArrowDataType),
Nested(Vec<ReorderIndex>),
Identity,
Missing(ArrowFieldRef),
}
impl ReorderIndex {
fn new(index: usize, transform: ReorderIndexTransform) -> Self {
ReorderIndex { index, transform }
}
fn cast(index: usize, target: ArrowDataType) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Cast(target))
}
fn nested(index: usize, children: Vec<ReorderIndex>) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Nested(children))
}
fn identity(index: usize) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Identity)
}
fn missing(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Missing(field))
}
fn needs_transform(&self) -> bool {
match self.transform {
ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true,
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children),
ReorderIndexTransform::Identity => false,
}
}
}
fn count_cols(field: &ArrowField) -> usize {
_count_cols(field.data_type())
}
fn _count_cols(dt: &ArrowDataType) -> usize {
match dt {
ArrowDataType::Struct(fields) => fields.iter().map(|f| count_cols(f)).sum(),
ArrowDataType::Union(fields, _) => fields.iter().map(|(_, f)| count_cols(f)).sum(),
ArrowDataType::List(field)
| ArrowDataType::LargeList(field)
| ArrowDataType::FixedSizeList(field, _)
| ArrowDataType::Map(field, _) => count_cols(field),
ArrowDataType::Dictionary(_, value_field) => _count_cols(value_field.as_ref()),
_ => 1, }
}
fn get_indices(
start_parquet_offset: usize,
requested_schema: &Schema,
fields: &Fields,
mask_indices: &mut Vec<usize>,
) -> DeltaResult<(usize, Vec<ReorderIndex>)> {
let mut found_fields = HashSet::with_capacity(requested_schema.fields.len());
let mut reorder_indices = Vec::with_capacity(requested_schema.fields.len());
let mut parquet_offset = start_parquet_offset;
let all_field_info = fields.iter().enumerate().map(|(parquet_index, field)| {
let field_info = requested_schema.fields.get_full(field.name());
(parquet_index, field, field_info)
});
for (parquet_index, field, field_info) in all_field_info {
debug!(
"Getting indices for field {} with offset {parquet_offset}, with index {parquet_index}",
field.name()
);
if let Some((index, _, requested_field)) = field_info {
match field.data_type() {
ArrowDataType::Struct(fields) => {
if let DataType::Struct(ref requested_schema) = requested_field.data_type {
let (parquet_advance, children) = get_indices(
parquet_index + parquet_offset,
requested_schema.as_ref(),
fields,
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
reorder_indices.push(ReorderIndex::nested(index, children));
} else {
return Err(Error::unexpected_column_type(field.name()));
}
}
ArrowDataType::List(list_field)
| ArrowDataType::LargeList(list_field)
| ArrowDataType::ListView(list_field) => {
if let DataType::Array(array_type) = requested_field.data_type() {
let requested_schema = StructType::new(vec![StructField::new(
list_field.name().clone(), array_type.element_type.clone(),
array_type.contains_null,
)]);
let (parquet_advance, mut children) = get_indices(
parquet_index + parquet_offset,
&requested_schema,
&[list_field.clone()].into(),
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
if children.len() != 1 {
return Err(Error::generic(
"List call should not have generated more than one reorder index",
));
}
let mut children = children.swap_remove(0);
children.index = index;
reorder_indices.push(children);
} else {
return Err(Error::unexpected_column_type(list_field.name()));
}
}
ArrowDataType::Map(key_val_field, _) => {
match (key_val_field.data_type(), requested_field.data_type()) {
(ArrowDataType::Struct(inner_fields), DataType::Map(map_type)) => {
let mut key_val_names =
inner_fields.iter().map(|f| f.name().to_string());
let key_name = key_val_names.next().ok_or_else(|| {
Error::generic("map fields didn't include a key col")
})?;
let val_name = key_val_names.next().ok_or_else(|| {
Error::generic("map fields didn't include a val col")
})?;
if key_val_names.next().is_some() {
return Err(Error::generic("map fields had more than 2 members"));
}
let inner_schema = map_type.as_struct_schema(key_name, val_name);
let (parquet_advance, _children) = get_indices(
parquet_index + parquet_offset,
&inner_schema,
inner_fields,
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
reorder_indices.push(ReorderIndex::identity(index));
}
_ => {
return Err(Error::unexpected_column_type(field.name()));
}
}
}
_ => {
match ensure_data_types(&requested_field.data_type, field.data_type())? {
DataTypeCompat::Identical => {
reorder_indices.push(ReorderIndex::identity(index))
}
DataTypeCompat::NeedsCast(target) => {
reorder_indices.push(ReorderIndex::cast(index, target))
}
DataTypeCompat::Nested => {
return Err(Error::internal_error(
"Comparing nested types in get_indices",
))
}
}
found_fields.insert(requested_field.name());
mask_indices.push(parquet_offset + parquet_index);
}
}
} else {
debug!("Skipping over un-selected field: {}", field.name());
parquet_offset += count_cols(field) - 1;
}
}
if found_fields.len() != requested_schema.fields.len() {
for (requested_position, field) in requested_schema.fields().enumerate() {
if !found_fields.contains(field.name()) {
if field.nullable {
debug!("Inserting missing and nullable field: {}", field.name());
reorder_indices.push(ReorderIndex::missing(
requested_position,
Arc::new(field.try_into()?),
));
} else {
return Err(Error::Generic(format!(
"Requested field not found in parquet schema, and field is not nullable: {}",
field.name()
)));
}
}
}
}
Ok((
parquet_offset + fields.len() - start_parquet_offset,
reorder_indices,
))
}
pub(crate) fn get_requested_indices(
requested_schema: &SchemaRef,
parquet_schema: &ArrowSchemaRef,
) -> DeltaResult<(Vec<usize>, Vec<ReorderIndex>)> {
let mut mask_indices = vec![];
let (_, reorder_indexes) = get_indices(
0,
requested_schema,
parquet_schema.fields(),
&mut mask_indices,
)?;
Ok((mask_indices, reorder_indexes))
}
pub(crate) fn generate_mask(
_requested_schema: &SchemaRef,
_parquet_schema: &ArrowSchemaRef,
parquet_physical_schema: &SchemaDescriptor,
indices: &[usize],
) -> Option<ProjectionMask> {
Some(ProjectionMask::leaves(
parquet_physical_schema,
indices.to_owned(),
))
}
fn ordering_needs_transform(requested_ordering: &[ReorderIndex]) -> bool {
if requested_ordering.is_empty() {
return false;
}
if requested_ordering[0].needs_transform() {
return true;
}
requested_ordering
.windows(2)
.any(|ri| (ri[0].index >= ri[1].index) || ri[1].needs_transform())
}
type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
) -> DeltaResult<StructArray> {
if !ordering_needs_transform(requested_ordering) {
Ok(input_data)
} else {
debug!("Have requested reorder {requested_ordering:#?} on {input_data:?}");
let num_rows = input_data.len();
let num_cols = requested_ordering.len();
let (input_fields, input_cols, null_buffer) = input_data.into_parts();
let mut final_fields_cols: Vec<FieldArrayOpt> = vec![None; num_cols];
for (parquet_position, reorder_index) in requested_ordering.iter().enumerate() {
match &reorder_index.transform {
ReorderIndexTransform::Cast(target) => {
let col = input_cols[parquet_position].as_ref();
let col = Arc::new(arrow_cast::cast::cast(col, target)?);
let new_field = Arc::new(
input_fields[parquet_position]
.as_ref()
.clone()
.with_data_type(col.data_type().clone()),
);
final_fields_cols[reorder_index.index] = Some((new_field, col));
}
ReorderIndexTransform::Nested(children) => {
match input_cols[parquet_position].data_type() {
ArrowDataType::Struct(_) => {
let struct_array = input_cols[parquet_position].as_struct().clone();
let result_array =
Arc::new(reorder_struct_array(struct_array, children)?);
let new_field = Arc::new(ArrowField::new_struct(
input_fields[parquet_position].name(),
result_array.fields().clone(),
input_fields[parquet_position].is_nullable(),
));
final_fields_cols[reorder_index.index] =
Some((new_field, result_array));
}
ArrowDataType::List(_) => {
let list_array = input_cols[parquet_position].as_list::<i32>().clone();
final_fields_cols[reorder_index.index] = reorder_list(
list_array,
input_fields[parquet_position].name(),
children,
)?;
}
ArrowDataType::LargeList(_) => {
let list_array = input_cols[parquet_position].as_list::<i64>().clone();
final_fields_cols[reorder_index.index] = reorder_list(
list_array,
input_fields[parquet_position].name(),
children,
)?;
}
_ => {
return Err(Error::internal_error(
"Nested reorder can only apply to struct/list/map.",
));
}
}
}
ReorderIndexTransform::Identity => {
final_fields_cols[reorder_index.index] = Some((
input_fields[parquet_position].clone(), input_cols[parquet_position].clone(), ));
}
ReorderIndexTransform::Missing(field) => {
let null_array = Arc::new(new_null_array(field.data_type(), num_rows));
let field = field.clone(); final_fields_cols[reorder_index.index] = Some((field, null_array));
}
}
}
let num_cols = final_fields_cols.len();
let (field_vec, reordered_columns): (Vec<Arc<ArrowField>>, _) =
final_fields_cols.into_iter().flatten().unzip();
if field_vec.len() != num_cols {
Err(Error::internal_error("Found a None in final_fields_cols."))
} else {
Ok(StructArray::try_new(
field_vec.into(),
reordered_columns,
null_buffer,
)?)
}
}
}
fn reorder_list<O: OffsetSizeTrait>(
list_array: GenericListArray<O>,
input_field_name: &str,
children: &[ReorderIndex],
) -> DeltaResult<FieldArrayOpt> {
let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts();
if let Some(struct_array) = maybe_sa.as_struct_opt() {
let struct_array = struct_array.clone();
let result_array = Arc::new(reorder_struct_array(struct_array, children)?);
let new_list_field = Arc::new(ArrowField::new_struct(
list_field.name(),
result_array.fields().clone(),
result_array.is_nullable(),
));
let new_field = Arc::new(ArrowField::new_list(
input_field_name,
new_list_field.clone(),
list_field.is_nullable(),
));
let list = Arc::new(GenericListArray::try_new(
new_list_field,
offset_buffer,
result_array,
null_buf,
)?);
Ok(Some((new_field, list)))
} else {
Err(Error::internal_error(
"Nested reorder of list should have had struct child.",
))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::{
array::AsArray,
buffer::{OffsetBuffer, ScalarBuffer},
};
use arrow_array::{
Array, ArrayRef as ArrowArrayRef, BooleanArray, GenericListArray, Int32Array, StructArray,
};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};
use super::{get_requested_indices, reorder_struct_array, ReorderIndex};
fn nested_parquet_schema() -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]))
}
#[test]
fn simple_mask_indices() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn ensure_data_types_fails_correctly() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
]));
let res = get_requested_indices(&requested_schema, &parquet_schema);
assert!(res.is_err());
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Int32, true),
]));
let res = get_requested_indices(&requested_schema, &parquet_schema);
println!("{res:#?}");
assert!(res.is_err());
}
#[test]
fn mask_with_map() {
let requested_schema = Arc::new(StructType::new(vec![StructField::new(
"map",
DataType::Map(Box::new(MapType::new(
DataType::INTEGER,
DataType::STRING,
false,
))),
false,
)]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map(
"map",
"entries",
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, false),
false,
false,
)]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expect_reorder = vec![ReorderIndex::identity(0)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn simple_reorder_indices() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i2", ArrowDataType::Int32, true),
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::identity(0),
ReorderIndex::identity(1),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn simple_nullable_field_missing() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::missing(1, Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true))),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"nested",
StructType::new(vec![
StructField::new("int32", DataType::INTEGER, false),
StructField::new("string", DataType::STRING, false),
]),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = nested_parquet_schema();
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_reorder() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new(
"nested",
StructType::new(vec![
StructField::new("string", DataType::STRING, false),
StructField::new("int32", DataType::INTEGER, false),
]),
false,
),
StructField::new("j", DataType::INTEGER, false),
StructField::new("i", DataType::INTEGER, false),
]));
let parquet_schema = nested_parquet_schema();
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::nested(
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::identity(1),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_mask_inner() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"nested",
StructType::new(vec![StructField::new("int32", DataType::INTEGER, false)]),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = nested_parquet_schema();
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn simple_list_mask() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new("list", ArrayType::new(DataType::INTEGER, false), false),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Int32,
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn list_skip_earlier_element() {
let requested_schema = Arc::new(StructType::new(vec![StructField::new(
"list",
ArrayType::new(DataType::INTEGER, false),
false,
)]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Int32,
false,
))),
false,
),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![1];
let expect_reorder = vec![ReorderIndex::identity(0)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_list() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"list",
ArrayType::new(
StructType::new(vec![
StructField::new("int32", DataType::INTEGER, false),
StructField::new("string", DataType::STRING, false),
])
.into(),
false,
),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_unselected_list() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 3];
let expect_reorder = vec![ReorderIndex::identity(0), ReorderIndex::identity(1)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_list_mask_inner() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"list",
ArrayType::new(
StructType::new(vec![StructField::new("int32", DataType::INTEGER, false)])
.into(),
false,
),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_list_mask_inner_reorder() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"list",
ArrayType::new(
StructType::new(vec![
StructField::new("string", DataType::STRING, false),
StructField::new("int2", DataType::INTEGER, false),
])
.into(),
false,
),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false), ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int1", ArrowDataType::Int32, false), ArrowField::new("int2", ArrowDataType::Int32, false), ArrowField::new("string", ArrowDataType::Utf8, false), ]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false), ]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 2, 3, 4];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn skipped_struct() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"nested",
StructType::new(vec![
StructField::new("int32", DataType::INTEGER, false),
StructField::new("string", DataType::STRING, false),
]),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
"skipped",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
ArrowField::new("i", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![2, 3, 4, 5];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(0),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
fn make_struct_array() -> StructArray {
let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
StructArray::from(vec![
(
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
boolean.clone() as ArrowArrayRef,
),
(
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
int.clone() as ArrowArrayRef,
),
])
}
#[test]
fn simple_reorder_struct() {
let arry = make_struct_array();
let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)];
let ordered = reorder_struct_array(arry, &reorder).unwrap();
assert_eq!(ordered.column_names(), vec!["c", "b"]);
}
#[test]
fn nested_reorder_struct() {
let arry1 = Arc::new(make_struct_array());
let arry2 = Arc::new(make_struct_array());
let fields: Fields = vec![
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
]
.into();
let nested = StructArray::from(vec![
(
Arc::new(ArrowField::new(
"struct1",
ArrowDataType::Struct(fields.clone()),
false,
)),
arry1 as ArrowArrayRef,
),
(
Arc::new(ArrowField::new(
"struct2",
ArrowDataType::Struct(fields),
false,
)),
arry2 as ArrowArrayRef,
),
]);
let reorder = vec![
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::nested(
0,
vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::missing(
2,
Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true)),
),
],
),
];
let ordered = reorder_struct_array(nested, &reorder).unwrap();
assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]);
let ordered_s2 = ordered.column(0).as_struct();
assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]);
let ordered_s1 = ordered.column(1).as_struct();
assert_eq!(ordered_s1.column_names(), vec!["c", "b"]);
}
#[test]
fn reorder_list_of_struct() {
let boolean = Arc::new(BooleanArray::from(vec![
false, false, true, true, false, true,
]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31, 0, 3]));
let list_sa = StructArray::from(vec![
(
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
boolean.clone() as ArrowArrayRef,
),
(
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
int.clone() as ArrowArrayRef,
),
]);
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6]));
let list_field = ArrowField::new("item", list_sa.data_type().clone(), false);
let list = Arc::new(GenericListArray::new(
Arc::new(list_field),
offsets,
Arc::new(list_sa),
None,
));
let fields: Fields = vec![
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
]
.into();
let list_dt = Arc::new(ArrowField::new(
"list",
ArrowDataType::new_list(ArrowDataType::Struct(fields), false),
false,
));
let struct_array = StructArray::from(vec![(list_dt, list as ArrowArrayRef)]);
let reorder = vec![ReorderIndex::nested(
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
)];
let ordered = reorder_struct_array(struct_array, &reorder).unwrap();
let ordered_list_col = ordered.column(0).as_list::<i32>();
for i in 0..ordered_list_col.len() {
let array_item = ordered_list_col.value(i);
let struct_item = array_item.as_struct();
assert_eq!(struct_item.column_names(), vec!["c", "b"]);
}
}
#[test]
fn no_matches() {
let requested_schema = Arc::new(StructType::new(vec![
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let nots_field = ArrowField::new("NOTs", ArrowDataType::Utf8, true);
let noti2_field = ArrowField::new("NOTi2", ArrowDataType::Int32, true);
let parquet_schema = Arc::new(ArrowSchema::new(vec![
nots_field.clone(),
noti2_field.clone(),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask: Vec<usize> = vec![];
let expect_reorder = vec![
ReorderIndex::missing(0, nots_field.with_name("s").into()),
ReorderIndex::missing(1, noti2_field.with_name("i2").into()),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn empty_requested_schema() {
let requested_schema = Arc::new(StructType::new(vec![]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask: Vec<usize> = vec![];
let expect_reorder = vec![];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
}