use std::collections::HashSet;
use std::io::{BufRead, BufReader};
use std::sync::Arc;
use crate::engine::ensure_data_types::DataTypeCompat;
use crate::{
engine::arrow_data::ArrowEngineData,
schema::{DataType, Schema, SchemaRef, StructField, StructType},
utils::require,
DeltaResult, EngineData, Error,
};
use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
RecordBatch, StringArray, StructArray,
};
use arrow_json::{LineDelimitedWriter, ReaderBuilder};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
SchemaRef as ArrowSchemaRef,
};
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor};
use tracing::debug;
macro_rules! prim_array_cmp {
( $left_arr: ident, $right_arr: ident, $(($data_ty: pat, $prim_ty: ty)),+ ) => {
return match $left_arr.data_type() {
$(
$data_ty => {
let prim_array = $left_arr.as_primitive_opt::<$prim_ty>()
.ok_or(Error::invalid_expression(
format!("Cannot cast to primitive array: {}", $left_arr.data_type()))
)?;
let list_array = $right_arr.as_list_opt::<i32>()
.ok_or(Error::invalid_expression(
format!("Cannot cast to list array: {}", $right_arr.data_type()))
)?;
arrow_ord::comparison::in_list(prim_array, list_array).map(wrap_comparison_result)
}
)+
_ => Err(ArrowError::CastError(
format!("Bad Comparison between: {:?} and {:?}",
$left_arr.data_type(),
$right_arr.data_type())
)
)
}.map_err(Error::generic_err);
};
}
pub(crate) use prim_array_cmp;
pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s.into())).with_backtrace()
}
#[derive(Debug, PartialEq)]
pub(crate) struct ReorderIndex {
pub(crate) index: usize,
transform: ReorderIndexTransform,
}
#[derive(Debug, PartialEq)]
pub(crate) enum ReorderIndexTransform {
Cast(ArrowDataType),
Nested(Vec<ReorderIndex>),
Identity,
Missing(ArrowFieldRef),
}
impl ReorderIndex {
fn new(index: usize, transform: ReorderIndexTransform) -> Self {
ReorderIndex { index, transform }
}
fn cast(index: usize, target: ArrowDataType) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Cast(target))
}
fn nested(index: usize, children: Vec<ReorderIndex>) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Nested(children))
}
fn identity(index: usize) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Identity)
}
fn missing(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::Missing(field))
}
fn needs_transform(&self) -> bool {
match self.transform {
ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true,
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children),
ReorderIndexTransform::Identity => false,
}
}
}
fn count_cols(field: &ArrowField) -> usize {
_count_cols(field.data_type())
}
fn _count_cols(dt: &ArrowDataType) -> usize {
match dt {
ArrowDataType::Struct(fields) => fields.iter().map(|f| count_cols(f)).sum(),
ArrowDataType::Union(fields, _) => fields.iter().map(|(_, f)| count_cols(f)).sum(),
ArrowDataType::List(field)
| ArrowDataType::LargeList(field)
| ArrowDataType::FixedSizeList(field, _)
| ArrowDataType::Map(field, _) => count_cols(field),
ArrowDataType::Dictionary(_, value_field) => _count_cols(value_field.as_ref()),
_ => 1, }
}
fn get_indices(
start_parquet_offset: usize,
requested_schema: &Schema,
fields: &Fields,
mask_indices: &mut Vec<usize>,
) -> DeltaResult<(usize, Vec<ReorderIndex>)> {
let mut found_fields = HashSet::with_capacity(requested_schema.fields.len());
let mut reorder_indices = Vec::with_capacity(requested_schema.fields.len());
let mut parquet_offset = start_parquet_offset;
let all_field_info = fields.iter().enumerate().map(|(parquet_index, field)| {
let field_info = requested_schema.fields.get_full(field.name());
(parquet_index, field, field_info)
});
for (parquet_index, field, field_info) in all_field_info {
debug!(
"Getting indices for field {} with offset {parquet_offset}, with index {parquet_index}",
field.name()
);
if let Some((index, _, requested_field)) = field_info {
match field.data_type() {
ArrowDataType::Struct(fields) => {
if let DataType::Struct(ref requested_schema) = requested_field.data_type {
let (parquet_advance, children) = get_indices(
parquet_index + parquet_offset,
requested_schema.as_ref(),
fields,
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
reorder_indices.push(ReorderIndex::nested(index, children));
} else {
return Err(Error::unexpected_column_type(field.name()));
}
}
ArrowDataType::List(list_field)
| ArrowDataType::LargeList(list_field)
| ArrowDataType::ListView(list_field) => {
if let DataType::Array(array_type) = requested_field.data_type() {
let requested_schema = StructType::new([StructField::new(
list_field.name().clone(), array_type.element_type.clone(),
array_type.contains_null,
)]);
let (parquet_advance, mut children) = get_indices(
parquet_index + parquet_offset,
&requested_schema,
&[list_field.clone()].into(),
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
if children.len() != 1 {
return Err(Error::generic(
"List call should not have generated more than one reorder index",
));
}
let mut children = children.swap_remove(0);
children.index = index;
reorder_indices.push(children);
} else {
return Err(Error::unexpected_column_type(list_field.name()));
}
}
ArrowDataType::Map(key_val_field, _) => {
match (key_val_field.data_type(), requested_field.data_type()) {
(ArrowDataType::Struct(inner_fields), DataType::Map(map_type)) => {
let mut key_val_names =
inner_fields.iter().map(|f| f.name().to_string());
let key_name = key_val_names.next().ok_or_else(|| {
Error::generic("map fields didn't include a key col")
})?;
let val_name = key_val_names.next().ok_or_else(|| {
Error::generic("map fields didn't include a val col")
})?;
if key_val_names.next().is_some() {
return Err(Error::generic("map fields had more than 2 members"));
}
let inner_schema = map_type.as_struct_schema(key_name, val_name);
let (parquet_advance, _children) = get_indices(
parquet_index + parquet_offset,
&inner_schema,
inner_fields,
mask_indices,
)?;
parquet_offset += parquet_advance - 1;
found_fields.insert(requested_field.name());
reorder_indices.push(ReorderIndex::identity(index));
}
_ => {
return Err(Error::unexpected_column_type(field.name()));
}
}
}
_ => {
match super::ensure_data_types::ensure_data_types(
&requested_field.data_type,
field.data_type(),
false,
)? {
DataTypeCompat::Identical => {
reorder_indices.push(ReorderIndex::identity(index))
}
DataTypeCompat::NeedsCast(target) => {
reorder_indices.push(ReorderIndex::cast(index, target))
}
DataTypeCompat::Nested => {
return Err(Error::internal_error(
"Comparing nested types in get_indices",
))
}
}
found_fields.insert(requested_field.name());
mask_indices.push(parquet_offset + parquet_index);
}
}
} else {
debug!("Skipping over un-selected field: {}", field.name());
parquet_offset += count_cols(field) - 1;
}
}
if found_fields.len() != requested_schema.fields.len() {
for (requested_position, field) in requested_schema.fields().enumerate() {
if !found_fields.contains(field.name()) {
if field.nullable {
debug!("Inserting missing and nullable field: {}", field.name());
reorder_indices.push(ReorderIndex::missing(
requested_position,
Arc::new(field.try_into()?),
));
} else {
return Err(Error::Generic(format!(
"Requested field not found in parquet schema, and field is not nullable: {}",
field.name()
)));
}
}
}
}
Ok((
parquet_offset + fields.len() - start_parquet_offset,
reorder_indices,
))
}
pub(crate) fn get_requested_indices(
requested_schema: &SchemaRef,
parquet_schema: &ArrowSchemaRef,
) -> DeltaResult<(Vec<usize>, Vec<ReorderIndex>)> {
let mut mask_indices = vec![];
let (_, reorder_indexes) = get_indices(
0,
requested_schema,
parquet_schema.fields(),
&mut mask_indices,
)?;
Ok((mask_indices, reorder_indexes))
}
pub(crate) fn generate_mask(
_requested_schema: &SchemaRef,
_parquet_schema: &ArrowSchemaRef,
parquet_physical_schema: &SchemaDescriptor,
indices: &[usize],
) -> Option<ProjectionMask> {
Some(ProjectionMask::leaves(
parquet_physical_schema,
indices.to_owned(),
))
}
fn ordering_needs_transform(requested_ordering: &[ReorderIndex]) -> bool {
if requested_ordering.is_empty() {
return false;
}
if requested_ordering[0].needs_transform() {
return true;
}
requested_ordering
.windows(2)
.any(|ri| (ri[0].index >= ri[1].index) || ri[1].needs_transform())
}
type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
) -> DeltaResult<StructArray> {
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
if !ordering_needs_transform(requested_ordering) {
Ok(input_data)
} else {
debug!("Have requested reorder {requested_ordering:#?} on {input_data:?}");
let num_rows = input_data.len();
let num_cols = requested_ordering.len();
let (input_fields, input_cols, null_buffer) = input_data.into_parts();
let mut final_fields_cols: Vec<FieldArrayOpt> = vec![None; num_cols];
for (parquet_position, reorder_index) in requested_ordering.iter().enumerate() {
match &reorder_index.transform {
ReorderIndexTransform::Cast(target) => {
let col = input_cols[parquet_position].as_ref();
let col = Arc::new(arrow_cast::cast::cast(col, target)?);
let new_field = Arc::new(
input_fields[parquet_position]
.as_ref()
.clone()
.with_data_type(col.data_type().clone()),
);
final_fields_cols[reorder_index.index] = Some((new_field, col));
}
ReorderIndexTransform::Nested(children) => {
match input_cols[parquet_position].data_type() {
ArrowDataType::Struct(_) => {
let struct_array = input_cols[parquet_position].as_struct().clone();
let result_array =
Arc::new(reorder_struct_array(struct_array, children)?);
let new_field = Arc::new(ArrowField::new_struct(
input_fields[parquet_position].name(),
result_array.fields().clone(),
input_fields[parquet_position].is_nullable(),
));
final_fields_cols[reorder_index.index] =
Some((new_field, result_array));
}
ArrowDataType::List(_) => {
let list_array = input_cols[parquet_position].as_list::<i32>().clone();
final_fields_cols[reorder_index.index] = reorder_list(
list_array,
input_fields[parquet_position].name(),
children,
)?;
}
ArrowDataType::LargeList(_) => {
let list_array = input_cols[parquet_position].as_list::<i64>().clone();
final_fields_cols[reorder_index.index] = reorder_list(
list_array,
input_fields[parquet_position].name(),
children,
)?;
}
_ => {
return Err(Error::internal_error(
"Nested reorder can only apply to struct/list/map.",
));
}
}
}
ReorderIndexTransform::Identity => {
final_fields_cols[reorder_index.index] = Some((
input_fields[parquet_position].clone(), input_cols[parquet_position].clone(), ));
}
ReorderIndexTransform::Missing(field) => {
let null_array = Arc::new(new_null_array(field.data_type(), num_rows));
let field = field.clone(); final_fields_cols[reorder_index.index] = Some((field, null_array));
}
}
}
let num_cols = final_fields_cols.len();
let (field_vec, reordered_columns): (Vec<Arc<ArrowField>>, _) =
final_fields_cols.into_iter().flatten().unzip();
if field_vec.len() != num_cols {
Err(Error::internal_error("Found a None in final_fields_cols."))
} else {
Ok(StructArray::try_new(
field_vec.into(),
reordered_columns,
null_buffer,
)?)
}
}
}
fn reorder_list<O: OffsetSizeTrait>(
list_array: GenericListArray<O>,
input_field_name: &str,
children: &[ReorderIndex],
) -> DeltaResult<FieldArrayOpt> {
let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts();
if let Some(struct_array) = maybe_sa.as_struct_opt() {
let struct_array = struct_array.clone();
let result_array = Arc::new(reorder_struct_array(struct_array, children)?);
let new_list_field = Arc::new(ArrowField::new_struct(
list_field.name(),
result_array.fields().clone(),
result_array.is_nullable(),
));
let new_field = Arc::new(ArrowField::new_list(
input_field_name,
new_list_field.clone(),
list_field.is_nullable(),
));
let list = Arc::new(GenericListArray::try_new(
new_list_field,
offset_buffer,
result_array,
null_buf,
)?);
Ok(Some((new_field, list)))
} else {
Err(Error::internal_error(
"Nested reorder of list should have had struct child.",
))
}
}
pub(crate) fn parse_json(
json_strings: Box<dyn EngineData>,
schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
let json_strings = json_strings
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::generic("Expected json_strings to be a StringArray, found something else")
})?;
let schema: ArrowSchemaRef = Arc::new(schema.as_ref().try_into()?);
let result = parse_json_impl(json_strings, schema)?;
Ok(Box::new(ArrowEngineData::new(result)))
}
fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaResult<RecordBatch> {
if json_strings.is_empty() {
return Ok(RecordBatch::new_empty(schema));
}
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(1)
.build_decoder()?;
let parse_one = |json_string: Option<&str>| -> DeltaResult<RecordBatch> {
let mut reader = BufReader::new(json_string.unwrap_or("{}").as_bytes());
let buf = reader.fill_buf()?;
let read = buf.len();
require!(
decoder.decode(buf)? == read,
Error::missing_data("Incomplete JSON string")
);
let Some(batch) = decoder.flush()? else {
return Err(Error::missing_data("Expected data"));
};
require!(batch.num_rows() == 1, Error::generic("Expected one row"));
Ok(batch)
};
let output: Vec<_> = json_strings.iter().map(parse_one).try_collect()?;
Ok(concat_batches(&schema, output.iter())?)
}
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data.into_iter() {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
}
writer.finish()?;
Ok(writer.into_inner())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::{
array::AsArray,
buffer::{OffsetBuffer, ScalarBuffer},
};
use arrow_array::{
Array, ArrayRef as ArrowArrayRef, BooleanArray, GenericListArray, Int32Array, StructArray,
};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};
use super::*;
fn nested_parquet_schema() -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]))
}
#[test]
fn test_json_parsing() {
let requested_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", ArrowDataType::Int32, true),
ArrowField::new("b", ArrowDataType::Utf8, true),
ArrowField::new("c", ArrowDataType::Int32, true),
]));
let input: Vec<&str> = vec![];
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
assert_eq!(result.num_rows(), 0);
let input: Vec<Option<&str>> = vec![Some("")];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("empty string");
let input: Vec<Option<&str>> = vec![Some(" \n\t")];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("empty string");
let input: Vec<Option<&str>> = vec![Some(r#""a""#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("invalid string");
let input: Vec<Option<&str>> = vec![Some(r#"{ "a": 1"#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("incomplete object");
let input: Vec<Option<&str>> = vec![Some("{}{}")];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("multiple objects (complete)");
let input: Vec<Option<&str>> = vec![Some(r#"{} { "a": 1"#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("multiple objects (partial)");
let input: Vec<Option<&str>> = vec![Some(r#"{ "a": 1"#), Some(r#", "b"}"#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("split object");
let input: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1, "b": "2", "c": 3}"#), None];
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
assert_eq!(result.num_rows(), 3);
assert_eq!(result.column(0).null_count(), 2);
assert_eq!(result.column(1).null_count(), 2);
assert_eq!(result.column(2).null_count(), 2);
}
#[test]
fn simple_mask_indices() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn ensure_data_types_fails_correctly() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
]));
let res = get_requested_indices(&requested_schema, &parquet_schema);
assert!(res.is_err());
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Int32, true),
]));
let res = get_requested_indices(&requested_schema, &parquet_schema);
assert!(res.is_err());
}
#[test]
fn mask_with_map() {
let requested_schema = Arc::new(StructType::new([StructField::new(
"map",
MapType::new(DataType::INTEGER, DataType::STRING, false),
false,
)]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map(
"map",
"entries",
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, false),
false,
false,
)]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expect_reorder = vec![ReorderIndex::identity(0)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn simple_reorder_indices() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i2", ArrowDataType::Int32, true),
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::identity(0),
ReorderIndex::identity(1),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn simple_nullable_field_missing() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::missing(1, Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true))),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"nested",
StructType::new([
StructField::new("int32", DataType::INTEGER, false),
StructField::new("string", DataType::STRING, false),
]),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = nested_parquet_schema();
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_reorder() {
let requested_schema = Arc::new(StructType::new([
StructField::new(
"nested",
StructType::new([
StructField::new("string", DataType::STRING, false),
StructField::new("int32", DataType::INTEGER, false),
]),
false,
),
StructField::new("j", DataType::INTEGER, false),
StructField::new("i", DataType::INTEGER, false),
]));
let parquet_schema = nested_parquet_schema();
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::nested(
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::identity(1),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_mask_inner() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"nested",
StructType::new([StructField::new("int32", DataType::INTEGER, false)]),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = nested_parquet_schema();
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn simple_list_mask() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new("list", ArrayType::new(DataType::INTEGER, false), false),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Int32,
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn list_skip_earlier_element() {
let requested_schema = Arc::new(StructType::new([StructField::new(
"list",
ArrayType::new(DataType::INTEGER, false),
false,
)]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Int32,
false,
))),
false,
),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![1];
let expect_reorder = vec![ReorderIndex::identity(0)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_list() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"list",
ArrayType::new(
StructType::new([
StructField::new("int32", DataType::INTEGER, false),
StructField::new("string", DataType::STRING, false),
])
.into(),
false,
),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 2, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_unselected_list() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 3];
let expect_reorder = vec![ReorderIndex::identity(0), ReorderIndex::identity(1)];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_list_mask_inner() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"list",
ArrayType::new(
StructType::new([StructField::new("int32", DataType::INTEGER, false)]).into(),
false,
),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1, 3];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn nested_indices_list_mask_inner_reorder() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"list",
ArrayType::new(
StructType::new([
StructField::new("string", DataType::STRING, false),
StructField::new("int2", DataType::INTEGER, false),
])
.into(),
false,
),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false), ArrowField::new(
"list",
ArrowDataType::List(Arc::new(ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int1", ArrowDataType::Int32, false), ArrowField::new("int2", ArrowDataType::Int32, false), ArrowField::new("string", ArrowDataType::Utf8, false), ]
.into(),
),
false,
))),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false), ]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 2, 3, 4];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::identity(2),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn skipped_struct() {
let requested_schema = Arc::new(StructType::new([
StructField::new("i", DataType::INTEGER, false),
StructField::new(
"nested",
StructType::new([
StructField::new("int32", DataType::INTEGER, false),
StructField::new("string", DataType::STRING, false),
]),
false,
),
StructField::new("j", DataType::INTEGER, false),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
"skipped",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
ArrowField::new("j", ArrowDataType::Int32, false),
ArrowField::new(
"nested",
ArrowDataType::Struct(
vec![
ArrowField::new("int32", ArrowDataType::Int32, false),
ArrowField::new("string", ArrowDataType::Utf8, false),
]
.into(),
),
false,
),
ArrowField::new("i", ArrowDataType::Int32, false),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![2, 3, 4, 5];
let expect_reorder = vec![
ReorderIndex::identity(2),
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(0), ReorderIndex::identity(1)],
),
ReorderIndex::identity(0),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
fn make_struct_array() -> StructArray {
let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
StructArray::from(vec![
(
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
boolean.clone() as ArrowArrayRef,
),
(
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
int.clone() as ArrowArrayRef,
),
])
}
#[test]
fn simple_reorder_struct() {
let arry = make_struct_array();
let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)];
let ordered = reorder_struct_array(arry, &reorder).unwrap();
assert_eq!(ordered.column_names(), vec!["c", "b"]);
}
#[test]
fn nested_reorder_struct() {
let arry1 = Arc::new(make_struct_array());
let arry2 = Arc::new(make_struct_array());
let fields: Fields = vec![
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
]
.into();
let nested = StructArray::from(vec![
(
Arc::new(ArrowField::new(
"struct1",
ArrowDataType::Struct(fields.clone()),
false,
)),
arry1 as ArrowArrayRef,
),
(
Arc::new(ArrowField::new(
"struct2",
ArrowDataType::Struct(fields),
false,
)),
arry2 as ArrowArrayRef,
),
]);
let reorder = vec![
ReorderIndex::nested(
1,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
),
ReorderIndex::nested(
0,
vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::missing(
2,
Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true)),
),
],
),
];
let ordered = reorder_struct_array(nested, &reorder).unwrap();
assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]);
let ordered_s2 = ordered.column(0).as_struct();
assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]);
let ordered_s1 = ordered.column(1).as_struct();
assert_eq!(ordered_s1.column_names(), vec!["c", "b"]);
}
#[test]
fn reorder_list_of_struct() {
let boolean = Arc::new(BooleanArray::from(vec![
false, false, true, true, false, true,
]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31, 0, 3]));
let list_sa = StructArray::from(vec![
(
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
boolean.clone() as ArrowArrayRef,
),
(
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
int.clone() as ArrowArrayRef,
),
]);
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6]));
let list_field = ArrowField::new("item", list_sa.data_type().clone(), false);
let list = Arc::new(GenericListArray::new(
Arc::new(list_field),
offsets,
Arc::new(list_sa),
None,
));
let fields: Fields = vec![
Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)),
Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)),
]
.into();
let list_dt = Arc::new(ArrowField::new(
"list",
ArrowDataType::new_list(ArrowDataType::Struct(fields), false),
false,
));
let struct_array = StructArray::from(vec![(list_dt, list as ArrowArrayRef)]);
let reorder = vec![ReorderIndex::nested(
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
)];
let ordered = reorder_struct_array(struct_array, &reorder).unwrap();
let ordered_list_col = ordered.column(0).as_list::<i32>();
for i in 0..ordered_list_col.len() {
let array_item = ordered_list_col.value(i);
let struct_item = array_item.as_struct();
assert_eq!(struct_item.column_names(), vec!["c", "b"]);
}
}
#[test]
fn no_matches() {
let requested_schema = Arc::new(StructType::new([
StructField::new("s", DataType::STRING, true),
StructField::new("i2", DataType::INTEGER, true),
]));
let nots_field = ArrowField::new("NOTs", ArrowDataType::Utf8, true);
let noti2_field = ArrowField::new("NOTi2", ArrowDataType::Int32, true);
let parquet_schema = Arc::new(ArrowSchema::new(vec![
nots_field.clone(),
noti2_field.clone(),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask: Vec<usize> = vec![];
let expect_reorder = vec![
ReorderIndex::missing(0, nots_field.with_name("s").into()),
ReorderIndex::missing(1, noti2_field.with_name("i2").into()),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn empty_requested_schema() {
let requested_schema = Arc::new(StructType::new([]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("s", ArrowDataType::Utf8, true),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask: Vec<usize> = vec![];
let expect_reorder = vec![];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}
#[test]
fn test_write_json() -> DeltaResult<()> {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"string",
ArrowDataType::Utf8,
true,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
);
Ok(())
}
}