use std::sync::Arc;
use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::StructArray as ArrowStructArray;
use arrow_buffer::NullBuffer;
use arrow_schema::Field;
use arrow_schema::Fields;
use itertools::Itertools;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::arrays::Chunked;
use crate::arrays::ScalarFnVTable;
use crate::arrays::Struct;
use crate::arrays::StructArray;
use crate::arrays::scalar_fn::ScalarFnArrayExt;
use crate::arrays::struct_::StructDataParts;
use crate::arrow::ArrowArrayExecutor;
use crate::arrow::executor::validity::to_arrow_null_buffer;
use crate::builtins::ArrayBuiltins;
use crate::dtype::DType;
use crate::dtype::FieldNames;
use crate::dtype::StructFields;
use crate::dtype::arrow::FromArrowType;
use crate::scalar_fn::fns::pack::Pack;
pub(super) fn to_arrow_struct(
array: ArrayRef,
target_fields: Option<&Fields>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrowArrayRef> {
let len = array.len();
let array = match array.try_downcast::<Chunked>() {
Ok(array) => {
array.into_array().execute::<StructArray>(ctx)?.into_array()
}
Err(array) => array,
};
let array = match array.try_downcast::<Struct>() {
Ok(array) => {
let StructDataParts {
validity,
fields,
struct_fields,
..
} = array.into_data_parts();
let validity = to_arrow_null_buffer(validity, len, ctx)?;
return create_from_fields(
target_fields.ok_or_else(|| struct_fields.names().clone()),
&fields,
validity,
len,
ctx,
);
}
Err(array) => array,
};
if let Some(array) = array.as_opt::<ScalarFnVTable>()
&& let Some(_pack_options) = array.scalar_fn().as_opt::<Pack>()
{
let DType::Struct(struct_fields, _) = array.dtype() else {
unreachable!("Pack must have Struct dtype");
};
return create_from_fields(
target_fields.ok_or_else(|| struct_fields.names().clone()),
&array.children(),
None, len,
ctx,
);
}
let array = if let Some(fields) = target_fields {
let vx_fields = StructFields::from_arrow(fields);
array.cast(DType::Struct(
vx_fields,
crate::dtype::Nullability::Nullable,
))?
} else {
array
};
let struct_array = array.execute::<StructArray>(ctx)?;
let StructDataParts {
validity,
fields,
struct_fields,
..
} = struct_array.into_data_parts();
let validity = to_arrow_null_buffer(validity, len, ctx)?;
create_from_fields(
target_fields.ok_or_else(|| struct_fields.names().clone()),
&fields,
validity,
len,
ctx,
)
}
fn create_from_fields(
fields: Result<&Fields, FieldNames>,
vortex_fields: &[ArrayRef],
null_buffer: Option<NullBuffer>,
len: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrowArrayRef> {
match fields {
Ok(fields) => {
vortex_ensure!(
vortex_fields.len() == fields.len(),
"StructArray has {} fields, but target Arrow type has {} fields",
vortex_fields.len(),
fields.len()
);
let mut arrow_arrays = Vec::with_capacity(vortex_fields.len());
for (field, vx_field) in fields.iter().zip_eq(vortex_fields.iter()) {
let arrow_field = vx_field
.clone()
.execute_arrow(Some(field.data_type()), ctx)?;
vortex_ensure!(
field.is_nullable() || arrow_field.null_count() == 0,
"Cannot convert field '{}' to non-nullable Arrow field because it contains nulls",
field.name()
);
arrow_arrays.push(arrow_field);
}
Ok(Arc::new(unsafe {
ArrowStructArray::new_unchecked_with_length(
fields.clone(),
arrow_arrays,
null_buffer,
len,
)
}))
}
Err(names) => {
let mut arrow_arrays = Vec::with_capacity(vortex_fields.len());
for vx_field in vortex_fields.iter() {
let arrow_array = vx_field.clone().execute_arrow(None, ctx)?;
arrow_arrays.push(arrow_array);
}
let arrow_fields: Fields = names
.iter()
.zip_eq(arrow_arrays.iter())
.zip_eq(vortex_fields.iter().map(|f| f.dtype().is_nullable()))
.map(|((name, arr), vx_nullable)| {
Arc::new(Field::new(
name.as_ref(),
arr.data_type().clone(),
vx_nullable,
))
})
.collect();
Ok(Arc::new(unsafe {
ArrowStructArray::new_unchecked_with_length(
arrow_fields,
arrow_arrays,
null_buffer,
len,
)
}))
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_array::ArrayRef;
use arrow_array::PrimitiveArray as ArrowPrimitiveArray;
use arrow_array::StringViewArray;
use arrow_array::StructArray as ArrowStructArray;
use arrow_array::types::Int32Type;
use arrow_buffer::NullBuffer;
use arrow_schema::DataType;
use arrow_schema::Field;
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::array;
use crate::arrays;
use crate::arrays::PrimitiveArray;
use crate::arrays::StructArray;
use crate::arrow::ArrowArrayExecutor;
use crate::arrow::FromArrowArray;
use crate::arrow::IntoArrowArray;
use crate::dtype::FieldNames;
use crate::validity::Validity;
#[test]
fn struct_nullable_non_null_to_arrow() -> VortexResult<()> {
let xs = PrimitiveArray::new(buffer![0i64, 1, 2, 3, 4], Validity::AllValid);
let struct_a = StructArray::try_new(
FieldNames::from(["xs"]),
vec![xs.into_array()],
5,
Validity::AllValid,
)?;
let fields = vec![Field::new("xs", DataType::Int64, false)];
let arrow_dt = DataType::Struct(fields.into());
struct_a.into_array().into_arrow(&arrow_dt)?;
Ok(())
}
#[test]
fn struct_nullable_with_nulls_to_arrow() -> VortexResult<()> {
let xs =
PrimitiveArray::from_option_iter(vec![Some(0_i64), Some(1), Some(2), None, Some(3)]);
let struct_a = StructArray::try_new(
FieldNames::from(["xs"]),
vec![xs.into_array()],
5,
Validity::AllValid,
)?;
let fields = vec![Field::new("xs", DataType::Int64, false)];
let arrow_dt = DataType::Struct(fields.into());
assert!(struct_a.into_array().into_arrow(&arrow_dt).is_err());
Ok(())
}
#[test]
fn struct_to_arrow_with_schema_mismatch() -> VortexResult<()> {
let xs = PrimitiveArray::new(buffer![0i64, 1, 2, 3, 4], Validity::AllValid);
let struct_a = StructArray::try_new(
FieldNames::from(["xs"]),
vec![xs.into_array()],
5,
Validity::AllValid,
)?;
let fields = vec![
Field::new("xs", DataType::Int8, false),
Field::new("ys", DataType::Int64, false),
];
let arrow_dt = DataType::Struct(fields.into());
let err = struct_a.into_array().into_arrow(&arrow_dt).err().unwrap();
assert!(
err.to_string()
.contains("StructArray has 1 fields, but target Arrow type has 2 fields")
);
Ok(())
}
#[test]
fn test_to_arrow() -> VortexResult<()> {
let array = StructArray::from_fields(
vec![
(
"a",
PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)]).into_array(),
),
(
"b",
arrays::varbinview::VarBinViewArray::from_iter_str(vec!["a", "b", "c"])
.into_array(),
),
]
.as_slice(),
)?;
let arrow_array: ArrayRef = Arc::new(ArrowStructArray::try_from(vec![
(
"a",
Arc::new(
ArrowPrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
vec![1, 0, 2],
Some(NullBuffer::from(vec![true, false, true])),
),
) as ArrayRef,
),
(
"b",
Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
),
])?);
let arrow_dtype = array.dtype().to_arrow_dtype()?;
assert_eq!(
&array.into_array().execute_arrow(
Some(&arrow_dtype),
&mut LEGACY_SESSION.create_execution_ctx()
)?,
&arrow_array
);
Ok(())
}
#[test]
fn to_arrow_with_non_nullable_fields() -> VortexResult<()> {
let array = StructArray::from_fields(
vec![
(
"a",
PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)]).into_array(),
),
(
"b",
arrays::varbinview::VarBinViewArray::from_iter_str(vec!["a", "b", "c"])
.into_array(),
),
]
.as_slice(),
)?;
let orig_dtype = array.dtype().clone();
let arrow_array = array.into_array().into_arrow_preferred()?;
let from_arrow = array::ArrayRef::from_arrow(arrow_array.as_ref(), false)?;
assert_eq!(&orig_dtype, from_arrow.dtype());
Ok(())
}
}