use std::collections::HashMap;
use std::sync::Arc;
use itertools::Itertools;
use tracing::debug;
use crate::arrow::array::cast::AsArray;
use crate::arrow::array::types::{
Date32Type, Decimal128Type, Float32Type, Float64Type, GenericStringType, Int32Type, Int64Type,
TimestampMicrosecondType,
};
use crate::arrow::array::{
Array, ArrayRef, GenericByteArray, OffsetSizeTrait, RecordBatch, RunArray, StringViewArray,
StructArray,
};
use crate::arrow::compute::filter_record_batch;
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, FieldRef, Schema as ArrowSchema,
};
use crate::engine::arrow_conversion::TryIntoArrow as _;
use crate::engine_data::{EngineData, GetData, RowVisitor, StringArrayAccessor};
use crate::expressions::ArrayData;
use crate::schema::{ColumnName, DataType, PrimitiveType, SchemaRef};
use crate::{DeltaResult, Error};
pub use crate::engine::arrow_utils::fix_nested_null_masks;
pub struct ArrowEngineData {
data: RecordBatch,
}
pub trait EngineDataArrowExt {
fn try_into_record_batch(self) -> DeltaResult<RecordBatch>;
}
impl EngineDataArrowExt for Box<dyn EngineData> {
fn try_into_record_batch(self) -> DeltaResult<RecordBatch> {
Ok(self
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into())
}
}
impl EngineDataArrowExt for DeltaResult<Box<dyn EngineData>> {
fn try_into_record_batch(self) -> DeltaResult<RecordBatch> {
Ok(self?
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into())
}
}
pub(crate) fn extract_record_batch(engine_data: &dyn EngineData) -> DeltaResult<&RecordBatch> {
let Some(arrow_data) = engine_data.any_ref().downcast_ref::<ArrowEngineData>() else {
return Err(Error::engine_data_type("ArrowEngineData"));
};
Ok(arrow_data.record_batch())
}
#[allow(dead_code)]
pub(crate) fn unshredded_variant_arrow_type() -> ArrowDataType {
let metadata_field = ArrowField::new("metadata", ArrowDataType::Binary, false);
let value_field = ArrowField::new("value", ArrowDataType::Binary, false);
let fields = vec![metadata_field, value_field];
ArrowDataType::Struct(fields.into())
}
impl ArrowEngineData {
pub fn new(data: RecordBatch) -> Self {
ArrowEngineData { data }
}
pub fn try_from_engine_data(engine_data: Box<dyn EngineData>) -> DeltaResult<Box<Self>> {
engine_data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| Error::engine_data_type("ArrowEngineData"))
}
pub fn record_batch(&self) -> &RecordBatch {
&self.data
}
}
impl From<RecordBatch> for ArrowEngineData {
fn from(value: RecordBatch) -> Self {
ArrowEngineData::new(value)
}
}
impl From<StructArray> for ArrowEngineData {
fn from(value: StructArray) -> Self {
ArrowEngineData::new(value.into())
}
}
impl From<ArrowEngineData> for RecordBatch {
fn from(value: ArrowEngineData) -> Self {
value.data
}
}
impl From<Box<ArrowEngineData>> for RecordBatch {
fn from(value: Box<ArrowEngineData>) -> Self {
value.data
}
}
impl<O: OffsetSizeTrait> StringArrayAccessor for GenericByteArray<GenericStringType<O>> {
fn len(&self) -> usize {
Array::len(self)
}
fn value(&self, index: usize) -> &str {
self.value(index)
}
fn is_valid(&self, index: usize) -> bool {
Array::is_valid(self, index)
}
}
impl StringArrayAccessor for StringViewArray {
fn len(&self) -> usize {
Array::len(self)
}
fn value(&self, index: usize) -> &str {
self.value(index)
}
fn is_valid(&self, index: usize) -> bool {
Array::is_valid(self, index)
}
}
pub(crate) fn as_string_accessor(array: &dyn Array) -> Option<&dyn StringArrayAccessor> {
if let Some(a) = array.as_string_opt::<i32>() {
Some(a)
} else if let Some(a) = array.as_string_opt::<i64>() {
Some(a)
} else {
Some(array.as_string_view_opt()?)
}
}
trait ProvidesColumnsAndFields {
fn columns(&self) -> &[ArrayRef];
fn fields(&self) -> &[FieldRef];
}
impl ProvidesColumnsAndFields for RecordBatch {
fn columns(&self) -> &[ArrayRef] {
self.columns()
}
fn fields(&self) -> &[FieldRef] {
self.schema_ref().fields()
}
}
impl ProvidesColumnsAndFields for StructArray {
fn columns(&self) -> &[ArrayRef] {
self.columns()
}
fn fields(&self) -> &[FieldRef] {
self.fields()
}
}
enum ColumnState<'a> {
Parent,
AwaitingGetter(&'a DataType),
HasGetter(&'a dyn GetData<'a>),
}
impl EngineData for ArrowEngineData {
fn len(&self) -> usize {
self.data.num_rows()
}
fn visit_rows(
&self,
leaf_columns: &[ColumnName],
visitor: &mut dyn RowVisitor,
) -> DeltaResult<()> {
let leaf_types = visitor.selected_column_names_and_types().1;
if leaf_types.len() != leaf_columns.len() {
return Err(Error::MissingColumn(format!(
"Visitor expected {} column names, but caller passed {}",
leaf_types.len(),
leaf_columns.len()
))
.with_backtrace());
}
let mut column_map = HashMap::with_capacity(leaf_columns.len() * 2);
for (column, data_type) in leaf_columns.iter().zip(leaf_types.iter()) {
column_map.insert(column.clone(), ColumnState::AwaitingGetter(data_type));
let mut cur_parent = column.parent();
while let Some(parent) = cur_parent {
column_map
.entry(parent.clone())
.or_insert(ColumnState::Parent);
cur_parent = parent.parent();
}
}
debug!(
"Column map for selected columns {leaf_columns:?} has {} entries",
column_map.len()
);
Self::extract_columns(&mut vec![], &mut column_map, &self.data)?;
let mut getters = Vec::with_capacity(leaf_columns.len());
for column in leaf_columns {
match column_map.get(column.as_ref()) {
Some(ColumnState::HasGetter(getter)) => getters.push(*getter),
_ => {
return Err(Error::MissingColumn(format!(
"Column {column} not found in the data"
)));
}
}
}
if getters.len() != leaf_columns.len() {
return Err(Error::MissingColumn(format!(
"Visitor expected {} leaf columns, but only {} were found in the data",
leaf_columns.len(),
getters.len()
)));
}
visitor.visit(self.len(), &getters)
}
fn append_columns(
&self,
schema: SchemaRef,
columns: Vec<ArrayData>,
) -> DeltaResult<Box<dyn EngineData>> {
let schema: ArrowSchema = schema.as_ref().try_into_arrow()?;
let mut combined_fields = self.data.schema().fields().to_vec();
combined_fields.extend_from_slice(schema.fields());
let combined_schema = Arc::new(ArrowSchema::new(combined_fields));
let new_columns: Vec<ArrayRef> = columns
.into_iter()
.map(|array_data| array_data.to_arrow())
.try_collect()?;
let mut combined_columns = self.data.columns().to_vec();
combined_columns.extend(new_columns);
let data = RecordBatch::try_new(combined_schema, combined_columns)?;
Ok(Box::new(ArrowEngineData { data }))
}
fn apply_selection_vector(
self: Box<Self>,
mut selection_vector: Vec<bool>,
) -> DeltaResult<Box<dyn EngineData>> {
selection_vector.resize(self.len(), true);
let filtered = filter_record_batch(&self.data, &selection_vector.into())?;
Ok(Box::new(Self::new(filtered)))
}
}
impl ArrowEngineData {
fn extract_columns<'a>(
path: &mut Vec<String>,
column_map: &mut HashMap<ColumnName, ColumnState<'a>>,
data: &'a dyn ProvidesColumnsAndFields,
) -> DeltaResult<()> {
for (column, field) in data.columns().iter().zip(data.fields()) {
path.push(field.name().to_string());
if let Some(state) = column_map.get_mut(path.as_slice()) {
match state {
ColumnState::Parent => {
if let Some(struct_array) = column.as_struct_opt() {
debug!(
"Recurse into a struct array for {}",
ColumnName::new(path.iter())
);
Self::extract_columns(path, column_map, struct_array)?;
}
}
ColumnState::AwaitingGetter(data_type) => {
let getter = if column.data_type() == &ArrowDataType::Null {
debug!("Pushing a null array for {}", ColumnName::new(path.iter()));
&() as &'a dyn GetData<'a>
} else {
Self::extract_leaf_column(path, data_type, column)?
};
*state = ColumnState::HasGetter(getter);
}
ColumnState::HasGetter(_) => {
return Err(Error::internal_error(format!(
"Column {} already has a getter - duplicate column?",
ColumnName::new(path.iter())
)));
}
}
} else {
debug!("Skipping unmasked path {}", ColumnName::new(path.iter()));
}
path.pop();
}
Ok(())
}
fn try_extract_with_ree<'a>(col: &'a dyn Array) -> Option<&'a dyn GetData<'a>> {
match col.data_type() {
ArrowDataType::RunEndEncoded(_, _) => col
.as_any()
.downcast_ref::<RunArray<Int64Type>>()
.map(|run_array| run_array as &'a dyn GetData<'a>),
_ => None,
}
}
fn extract_leaf_column<'a>(
path: &[String],
data_type: &DataType,
col: &'a dyn Array,
) -> DeltaResult<&'a dyn GetData<'a>> {
let is_string_type = |dt: &ArrowDataType| {
matches!(
dt,
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
)
};
let col_as_list = || -> Option<&'a dyn GetData<'a>> {
match col.data_type() {
ArrowDataType::List(f)
| ArrowDataType::LargeList(f)
| ArrowDataType::ListView(f)
| ArrowDataType::LargeListView(f)
if is_string_type(f.data_type()) => {}
_ => return None,
}
col.as_list_opt::<i32>()
.map(|a| a as _)
.or_else(|| col.as_list_opt::<i64>().map(|a| a as _))
.or_else(|| col.as_list_view_opt::<i32>().map(|a| a as _))
.or_else(|| col.as_list_view_opt::<i64>().map(|a| a as _))
};
let col_as_map = || {
col.as_map_opt().and_then(|array| {
(is_string_type(array.key_type()) && is_string_type(array.value_type()))
.then_some(array as _)
})
};
let result: Result<&'a dyn GetData<'a>, _> = match data_type {
&DataType::BOOLEAN => {
debug!("Pushing boolean array for {}", ColumnName::new(path));
col.as_boolean_opt()
.map(|a| a as _)
.or_else(|| Self::try_extract_with_ree(col))
.ok_or("bool")
}
&DataType::STRING => {
debug!("Pushing string array for {}", ColumnName::new(path));
col.as_string_opt::<i32>()
.map(|a| a as _)
.or_else(|| col.as_string_opt::<i64>().map(|a| a as _))
.or_else(|| col.as_string_view_opt().map(|a| a as _))
.or_else(|| Self::try_extract_with_ree(col))
.ok_or("string")
}
&DataType::BINARY => {
debug!("Pushing binary array for {}", ColumnName::new(path));
col.as_binary_opt::<i32>()
.map(|a| a as _)
.or_else(|| col.as_binary_opt::<i64>().map(|a| a as _))
.or_else(|| col.as_binary_view_opt().map(|a| a as _))
.or_else(|| Self::try_extract_with_ree(col))
.ok_or("binary")
}
&DataType::INTEGER => {
debug!("Pushing int32 array for {}", ColumnName::new(path));
col.as_primitive_opt::<Int32Type>()
.map(|a| a as _)
.or_else(|| Self::try_extract_with_ree(col))
.ok_or("int")
}
&DataType::LONG => {
debug!("Pushing int64 array for {}", ColumnName::new(path));
col.as_primitive_opt::<Int64Type>()
.map(|a| a as _)
.or_else(|| Self::try_extract_with_ree(col))
.ok_or("long")
}
&DataType::FLOAT => {
debug!("Pushing float array for {}", ColumnName::new(path));
col.as_primitive_opt::<Float32Type>()
.map(|a| a as _)
.ok_or("float")
}
&DataType::DOUBLE => {
debug!("Pushing double array for {}", ColumnName::new(path));
col.as_primitive_opt::<Float64Type>()
.map(|a| a as _)
.ok_or("double")
}
&DataType::DATE => {
debug!("Pushing date array for {}", ColumnName::new(path));
col.as_primitive_opt::<Date32Type>()
.map(|a| a as _)
.ok_or("date")
}
&DataType::TIMESTAMP | &DataType::TIMESTAMP_NTZ => {
debug!("Pushing timestamp array for {}", ColumnName::new(path));
col.as_primitive_opt::<TimestampMicrosecondType>()
.map(|a| a as _)
.ok_or("timestamp")
}
DataType::Primitive(PrimitiveType::Decimal(_)) => {
debug!("Pushing decimal array for {}", ColumnName::new(path));
col.as_primitive_opt::<Decimal128Type>()
.map(|a| a as _)
.ok_or("decimal")
}
DataType::Array(_) => {
debug!("Pushing list for {}", ColumnName::new(path));
col_as_list().ok_or("array<string>")
}
DataType::Map(_) => {
debug!("Pushing map for {}", ColumnName::new(path));
col_as_map().ok_or("map<string, string>")
}
data_type => {
return Err(Error::UnexpectedColumnType(format!(
"On {}: Unsupported type {data_type}",
ColumnName::new(path)
)));
}
};
result.map_err(|type_name| {
Error::UnexpectedColumnType(format!(
"Type mismatch on {}: expected {}, got {}",
ColumnName::new(path),
type_name,
col.data_type()
))
})
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, LazyLock};
use crate::actions::{get_commit_schema, Metadata, Protocol};
use crate::arrow::array::types::{Int32Type, Int64Type};
use crate::arrow::array::{
Array, ArrayRef, AsArray, BinaryArray, BooleanArray, Int32Array, Int64Array,
LargeBinaryArray, LargeStringArray, ListViewArray, MapArray, RecordBatch, RunArray,
StringArray, StringViewArray, StructArray,
};
use crate::arrow::buffer::{OffsetBuffer, ScalarBuffer};
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
use crate::engine::sync::SyncEngine;
use crate::engine_data::{GetData, ListItem, MapItem, RowVisitor, TypedGetData};
use crate::expressions::ArrayData;
use crate::schema::{ArrayType, ColumnName, DataType, StructField, StructType};
use crate::table_features::TableFeature;
use crate::utils::test_utils::{assert_result_error_with_message, string_array_to_engine_data};
use crate::{DeltaResult, Engine as _, EngineData as _};
use rstest::rstest;
use super::{extract_record_batch, ArrowEngineData};
#[test]
fn test_md_extract() -> DeltaResult<()> {
let engine = SyncEngine::new();
let handler = engine.json_handler();
let json_strings: StringArray = vec![
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
]
.into();
let output_schema = get_commit_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let metadata = Metadata::try_new_from_data(parsed.as_ref())?.unwrap();
assert_eq!(metadata.id(), "aff5cb91-8cd9-4195-aef9-446908507302");
assert_eq!(metadata.created_time(), Some(1670892997849));
assert_eq!(*metadata.partition_columns(), vec!("c1", "c2"));
Ok(())
}
#[test]
fn test_protocol_extract() -> DeltaResult<()> {
let engine = SyncEngine::new();
let handler = engine.json_handler();
let json_strings: StringArray = vec![
r#"{"protocol": {"minReaderVersion": 3, "minWriterVersion": 7, "readerFeatures": ["rw1"], "writerFeatures": ["rw1", "w2"]}}"#,
]
.into();
let output_schema = get_commit_schema().project(&["protocol"])?;
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let protocol = Protocol::try_new_from_data(parsed.as_ref())?.unwrap();
assert_eq!(protocol.min_reader_version(), 3);
assert_eq!(protocol.min_writer_version(), 7);
assert_eq!(
protocol.reader_features(),
Some([TableFeature::unknown("rw1")].as_slice())
);
assert_eq!(
protocol.writer_features(),
Some([TableFeature::unknown("rw1"), TableFeature::unknown("w2")].as_slice())
);
Ok(())
}
#[test]
fn test_append_columns() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", ArrowDataType::Int32, false),
ArrowField::new("name", ArrowDataType::Utf8, true),
]));
let initial_batch = RecordBatch::try_new(
initial_schema,
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])),
],
)?;
let arrow_data = ArrowEngineData::new(initial_batch);
let new_columns = vec![
ArrayData::try_new(
ArrayType::new(DataType::INTEGER, true),
vec![Some(25), None],
)?,
ArrayData::try_new(ArrayType::new(DataType::BOOLEAN, false), vec![true, false])?,
];
let new_schema = Arc::new(StructType::new_unchecked([
StructField::new("age", DataType::INTEGER, true),
StructField::new("active", DataType::BOOLEAN, false),
]));
let arrow_data = arrow_data.append_columns(new_schema, new_columns)?;
let result_batch = extract_record_batch(arrow_data.as_ref())?;
assert_eq!(result_batch.num_columns(), 4);
assert_eq!(result_batch.num_rows(), 2);
let schema = result_batch.schema();
assert_eq!(schema.field(0).name(), "id");
assert_eq!(schema.field(1).name(), "name");
assert_eq!(schema.field(2).name(), "age");
assert_eq!(schema.field(3).name(), "active");
assert_eq!(schema.field(0).data_type(), &ArrowDataType::Int32);
assert_eq!(schema.field(1).data_type(), &ArrowDataType::Utf8);
assert_eq!(schema.field(2).data_type(), &ArrowDataType::Int32);
assert_eq!(schema.field(3).data_type(), &ArrowDataType::Boolean);
let id_column = result_batch.column(0).as_primitive::<Int32Type>();
let name_column = result_batch.column(1).as_string::<i32>();
let age_column = result_batch.column(2).as_primitive::<Int32Type>();
let active_column = result_batch.column(3).as_boolean();
assert_eq!(id_column.values(), &[1, 2]);
assert_eq!(name_column.value(0), "Alice");
assert_eq!(name_column.value(1), "Bob");
assert_eq!(age_column.value(0), 25);
assert!(age_column.is_null(1));
assert!(active_column.value(0));
assert!(!active_column.value(1));
Ok(())
}
#[test]
fn test_append_columns_row_mismatch() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
ArrowDataType::Int32,
false,
)]));
let initial_batch =
RecordBatch::try_new(initial_schema, vec![Arc::new(Int32Array::from(vec![1, 2]))])?;
let arrow_data = super::ArrowEngineData::new(initial_batch);
let new_columns = vec![ArrayData::try_new(
ArrayType::new(DataType::INTEGER, false),
vec![25, 30, 35],
)?];
let new_schema = Arc::new(StructType::new_unchecked([StructField::new(
"age",
DataType::INTEGER,
true,
)]));
let result = arrow_data.append_columns(new_schema, new_columns);
assert_result_error_with_message(
result,
"all columns in a record batch must have the same length",
);
Ok(())
}
#[test]
fn test_append_columns_schema_field_count_mismatch() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
ArrowDataType::Int32,
false,
)]));
let initial_batch =
RecordBatch::try_new(initial_schema, vec![Arc::new(Int32Array::from(vec![1, 2]))])?;
let arrow_data = ArrowEngineData::new(initial_batch);
let new_columns = vec![ArrayData::try_new(
ArrayType::new(DataType::STRING, true),
vec![Some("Alice".to_string()), Some("Bob".to_string())],
)?];
let new_schema = Arc::new(StructType::new_unchecked([
StructField::new("name", DataType::STRING, true),
StructField::new("email", DataType::STRING, true), ]));
let result = arrow_data.append_columns(new_schema, new_columns);
assert_result_error_with_message(
result,
"number of columns(2) must match number of fields(3)",
);
Ok(())
}
#[test]
fn test_append_columns_empty_existing_data() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
ArrowDataType::Int32,
false,
)]));
let initial_batch = RecordBatch::try_new(
initial_schema,
vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
)?;
let arrow_data = ArrowEngineData::new(initial_batch);
let new_columns = vec![ArrayData::try_new(
ArrayType::new(DataType::STRING, true),
Vec::<Option<String>>::new(),
)?];
let new_schema = Arc::new(StructType::new_unchecked([StructField::new(
"name",
DataType::STRING,
true,
)]));
let result_data = arrow_data.append_columns(new_schema, new_columns)?;
let result_batch = extract_record_batch(result_data.as_ref())?;
assert_eq!(result_batch.num_columns(), 2);
assert_eq!(result_batch.num_rows(), 0);
assert_eq!(result_batch.schema().field(0).name(), "id");
assert_eq!(result_batch.schema().field(1).name(), "name");
Ok(())
}
#[test]
fn test_append_columns_empty_new_columns() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
ArrowDataType::Int32,
false,
)]));
let initial_batch =
RecordBatch::try_new(initial_schema, vec![Arc::new(Int32Array::from(vec![1, 2]))])?;
let arrow_data = ArrowEngineData::new(initial_batch);
let new_columns = vec![];
let new_schema = Arc::new(StructType::new_unchecked([]));
let result_data = arrow_data.append_columns(new_schema, new_columns)?;
let result_batch = extract_record_batch(result_data.as_ref())?;
assert_eq!(result_batch.num_columns(), 1);
assert_eq!(result_batch.num_rows(), 2);
assert_eq!(result_batch.schema().field(0).name(), "id");
Ok(())
}
#[test]
fn test_append_columns_with_nulls() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
ArrowDataType::Int32,
false,
)]));
let initial_batch = RecordBatch::try_new(
initial_schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;
let arrow_data = ArrowEngineData::new(initial_batch);
let new_columns = vec![
ArrayData::try_new(
ArrayType::new(DataType::STRING, true),
vec![Some("Alice".to_string()), None, Some("Charlie".to_string())],
)?,
ArrayData::try_new(
ArrayType::new(DataType::INTEGER, true),
vec![Some(25), Some(30), None],
)?,
];
let new_schema = Arc::new(StructType::new_unchecked([
StructField::new("name", DataType::STRING, true),
StructField::new("age", DataType::INTEGER, true),
]));
let result_data = arrow_data.append_columns(new_schema, new_columns)?;
let result_batch = extract_record_batch(result_data.as_ref())?;
assert_eq!(result_batch.num_columns(), 3);
assert_eq!(result_batch.num_rows(), 3);
assert!(!result_batch.schema().field(0).is_nullable());
assert!(result_batch.schema().field(1).is_nullable());
assert!(result_batch.schema().field(2).is_nullable());
Ok(())
}
#[test]
fn test_append_columns_various_data_types() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
ArrowDataType::Int32,
false,
)]));
let initial_batch =
RecordBatch::try_new(initial_schema, vec![Arc::new(Int32Array::from(vec![1, 2]))])?;
let arrow_data = ArrowEngineData::new(initial_batch);
let new_columns = vec![
ArrayData::try_new(
ArrayType::new(DataType::LONG, false),
vec![1000_i64, 2000_i64],
)?,
ArrayData::try_new(
ArrayType::new(DataType::DOUBLE, true),
vec![Some(3.87), Some(2.71)],
)?,
ArrayData::try_new(ArrayType::new(DataType::BOOLEAN, false), vec![true, false])?,
];
let new_schema = Arc::new(StructType::new_unchecked([
StructField::new("big_number", DataType::LONG, false),
StructField::new("pi", DataType::DOUBLE, true),
StructField::new("flag", DataType::BOOLEAN, false),
]));
let result_data = arrow_data.append_columns(new_schema, new_columns)?;
let result_batch = extract_record_batch(result_data.as_ref())?;
assert_eq!(result_batch.num_columns(), 4);
assert_eq!(result_batch.num_rows(), 2);
let schema = result_batch.schema();
assert_eq!(schema.field(0).data_type(), &ArrowDataType::Int32);
assert_eq!(schema.field(1).data_type(), &ArrowDataType::Int64);
assert_eq!(schema.field(2).data_type(), &ArrowDataType::Float64);
assert_eq!(schema.field(3).data_type(), &ArrowDataType::Boolean);
Ok(())
}
#[test]
fn test_append_single_column() -> DeltaResult<()> {
let initial_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", ArrowDataType::Int32, false),
ArrowField::new("name", ArrowDataType::Utf8, true),
]));
let initial_batch = RecordBatch::try_new(
initial_schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
Some("Alice"),
Some("Bob"),
Some("Charlie"),
])),
],
)?;
let arrow_data = ArrowEngineData::new(initial_batch);
let new_columns = vec![ArrayData::try_new(
ArrayType::new(DataType::BOOLEAN, false),
vec![true, false, true],
)?];
let new_schema = Arc::new(StructType::new_unchecked([StructField::new(
"active",
DataType::BOOLEAN,
false,
)]));
let result_data = arrow_data.append_columns(new_schema, new_columns)?;
let result_batch = extract_record_batch(result_data.as_ref())?;
assert_eq!(result_batch.num_columns(), 3);
assert_eq!(result_batch.num_rows(), 3);
assert_eq!(result_batch.schema().field(2).name(), "active");
Ok(())
}
#[test]
fn test_binary_column_extraction() -> DeltaResult<()> {
let binary_data: Vec<Option<&[u8]>> = vec![
Some(b"hello"),
Some(b"world"),
None,
Some(b"\x00\x01\x02\x03"),
];
let binary_array = BinaryArray::from(binary_data.clone());
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"data",
ArrowDataType::Binary,
true,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(binary_array)])?;
let arrow_data = ArrowEngineData::new(batch);
struct BinaryVisitor {
values: Vec<Option<Vec<u8>>>,
}
impl RowVisitor for BinaryVisitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["data"])]);
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::BINARY]);
(&NAMES, &TYPES)
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
assert_eq!(getters.len(), 1);
let getter = getters[0];
for i in 0..row_count {
self.values
.push(getter.get_binary(i, "data")?.map(|b| b.to_vec()));
}
Ok(())
}
}
let mut visitor = BinaryVisitor { values: vec![] };
arrow_data.visit_rows(&[ColumnName::new(["data"])], &mut visitor)?;
assert_eq!(visitor.values.len(), 4);
assert_eq!(visitor.values[0].as_deref(), Some(b"hello".as_ref()));
assert_eq!(visitor.values[1].as_deref(), Some(b"world".as_ref()));
assert_eq!(visitor.values[2], None);
assert_eq!(
visitor.values[3].as_deref(),
Some(b"\x00\x01\x02\x03".as_ref())
);
Ok(())
}
#[test]
fn test_binary_column_extraction_type_mismatch() -> DeltaResult<()> {
let data: Vec<Option<i32>> = vec![Some(123)];
let int_array = Int32Array::from(data);
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"data",
ArrowDataType::Int32,
true,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)])?;
let arrow_data = ArrowEngineData::new(batch);
struct BinaryVisitor {
values: Vec<Option<Vec<u8>>>,
}
impl RowVisitor for BinaryVisitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["data"])]);
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::BINARY]);
(&NAMES, &TYPES)
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
assert_eq!(getters.len(), 1);
let getter = getters[0];
for i in 0..row_count {
self.values
.push(getter.get_binary(i, "data")?.map(|b| b.to_vec()));
}
Ok(())
}
}
let mut visitor = BinaryVisitor { values: vec![] };
let result = arrow_data.visit_rows(&[ColumnName::new(["data"])], &mut visitor);
assert_result_error_with_message(
result,
"Type mismatch on data: expected binary, got Int32",
);
Ok(())
}
#[test]
fn test_column_ordering_independence() -> DeltaResult<()> {
let nested_fields = vec![
ArrowField::new("x", ArrowDataType::Int32, false),
ArrowField::new("y", ArrowDataType::Int32, false),
];
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
ArrowField::new("field_a", ArrowDataType::Int32, false),
ArrowField::new("field_b", ArrowDataType::Int32, false),
ArrowField::new(
"nested",
ArrowDataType::Struct(nested_fields.clone().into()),
false,
),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Int32Array::from(vec![10, 20])),
Arc::new(StructArray::try_new(
nested_fields.into(),
vec![
Arc::new(Int32Array::from(vec![100, 200])),
Arc::new(Int32Array::from(vec![1000, 2000])),
],
None,
)?),
],
)?;
static REQUESTED_COLUMNS: LazyLock<Vec<ColumnName>> = LazyLock::new(|| {
vec![
ColumnName::new(["nested", "y"]),
ColumnName::new(["field_b"]),
ColumnName::new(["nested", "x"]),
ColumnName::new(["field_a"]),
]
});
struct Visitor {
values: Vec<(i32, i32, i32, i32)>,
}
impl RowVisitor for Visitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static TYPES: LazyLock<Vec<DataType>> =
LazyLock::new(|| vec![DataType::INTEGER; 4]);
(&REQUESTED_COLUMNS, &TYPES)
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
for i in 0..row_count {
self.values.push((
getters[0].get(i, "nested.y")?,
getters[1].get(i, "field_b")?,
getters[2].get(i, "nested.x")?,
getters[3].get(i, "field_a")?,
));
}
Ok(())
}
}
let mut visitor = Visitor { values: vec![] };
ArrowEngineData::new(batch).visit_rows(&REQUESTED_COLUMNS, &mut visitor)?;
assert_eq!(visitor.values, vec![(1000, 10, 100, 1), (2000, 20, 200, 2)]);
Ok(())
}
#[test]
fn test_visit_duplicate_column_error() -> DeltaResult<()> {
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
ArrowField::new("field_a", ArrowDataType::Int32, false),
ArrowField::new("field_a", ArrowDataType::Int32, false), ])),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Int32Array::from(vec![10, 20])),
],
)?;
static REQUESTED_COLUMNS: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["field_a"])]);
struct DummyVisitor;
impl RowVisitor for DummyVisitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::INTEGER]);
(&REQUESTED_COLUMNS, &TYPES)
}
fn visit<'a>(
&mut self,
_row_count: usize,
_getters: &[&'a dyn crate::engine_data::GetData<'a>],
) -> DeltaResult<()> {
Ok(())
}
}
let mut visitor = DummyVisitor;
let result = ArrowEngineData::new(batch).visit_rows(&REQUESTED_COLUMNS, &mut visitor);
assert_result_error_with_message(
result,
"Column field_a already has a getter - duplicate column?",
);
Ok(())
}
#[test]
fn test_run_array_out_of_bounds_errors() -> DeltaResult<()> {
let run_ends = Int64Array::from(vec![2]);
let str_array =
RunArray::<Int64Type>::try_new(&run_ends, &StringArray::from(vec!["test"]))?;
let err_msg = str_array.get_str(2, "str_field").unwrap_err().to_string();
assert!(err_msg.contains("out of bounds") && err_msg.contains("str_field"));
let int_array = RunArray::<Int64Type>::try_new(&run_ends, &Int32Array::from(vec![42]))?;
let err_msg = int_array.get_int(5, "int_field").unwrap_err().to_string();
assert!(err_msg.contains("out of bounds") && err_msg.contains("int_field"));
let long_array =
RunArray::<Int64Type>::try_new(&run_ends, &Int64Array::from(vec![100i64]))?;
let err_msg = long_array
.get_long(3, "long_field")
.unwrap_err()
.to_string();
assert!(err_msg.contains("out of bounds") && err_msg.contains("long_field"));
let bool_array =
RunArray::<Int64Type>::try_new(&run_ends, &BooleanArray::from(vec![true]))?;
let err_msg = bool_array
.get_bool(2, "bool_field")
.unwrap_err()
.to_string();
assert!(err_msg.contains("out of bounds") && err_msg.contains("bool_field"));
let binary_array = RunArray::<Int64Type>::try_new(
&run_ends,
&BinaryArray::from(vec![Some(b"data".as_ref())]),
)?;
let err_msg = binary_array
.get_binary(4, "binary_field")
.unwrap_err()
.to_string();
assert!(err_msg.contains("out of bounds") && err_msg.contains("binary_field"));
Ok(())
}
#[test]
fn test_run_array_extraction_via_visitor() -> DeltaResult<()> {
let run_ends = Int64Array::from(vec![2, 4, 5]);
let mk_field = |name, dt| {
ArrowField::new(
name,
ArrowDataType::RunEndEncoded(
Arc::new(ArrowField::new("run_ends", ArrowDataType::Int64, false)),
Arc::new(ArrowField::new("values", dt, true)),
),
true,
)
};
let columns: Vec<Arc<dyn Array>> = vec![
Arc::new(RunArray::<Int64Type>::try_new(
&run_ends,
&StringArray::from(vec![Some("a"), None, Some("b")]),
)?),
Arc::new(RunArray::<Int64Type>::try_new(
&run_ends,
&Int32Array::from(vec![Some(1), None, Some(2)]),
)?),
Arc::new(RunArray::<Int64Type>::try_new(
&run_ends,
&Int64Array::from(vec![Some(10i64), None, Some(20)]),
)?),
Arc::new(RunArray::<Int64Type>::try_new(
&run_ends,
&BooleanArray::from(vec![Some(true), None, Some(false)]),
)?),
Arc::new(RunArray::<Int64Type>::try_new(
&run_ends,
&BinaryArray::from(vec![Some(b"x".as_ref()), None, Some(b"y".as_ref())]),
)?),
];
let schema = Arc::new(ArrowSchema::new(vec![
mk_field("s", ArrowDataType::Utf8),
mk_field("i", ArrowDataType::Int32),
mk_field("l", ArrowDataType::Int64),
mk_field("b", ArrowDataType::Boolean),
mk_field("bin", ArrowDataType::Binary),
]));
let arrow_data = ArrowEngineData::new(RecordBatch::try_new(schema, columns)?);
type Row = (
Option<String>,
Option<i32>,
Option<i64>,
Option<bool>,
Option<Vec<u8>>,
);
struct TestVisitor {
data: Vec<Row>,
}
impl RowVisitor for TestVisitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static COLUMNS: LazyLock<[ColumnName; 5]> = LazyLock::new(|| {
[
ColumnName::new(["s"]),
ColumnName::new(["i"]),
ColumnName::new(["l"]),
ColumnName::new(["b"]),
ColumnName::new(["bin"]),
]
});
static TYPES: &[DataType] = &[
DataType::STRING,
DataType::INTEGER,
DataType::LONG,
DataType::BOOLEAN,
DataType::BINARY,
];
(&*COLUMNS, TYPES)
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
for i in 0..row_count {
self.data.push((
getters[0].get_str(i, "s")?.map(|s| s.to_string()),
getters[1].get_int(i, "i")?,
getters[2].get_long(i, "l")?,
getters[3].get_bool(i, "b")?,
getters[4].get_binary(i, "bin")?.map(|b| b.to_vec()),
));
}
Ok(())
}
}
let mut visitor = TestVisitor { data: vec![] };
visitor.visit_rows_of(&arrow_data)?;
let expected = vec![
(
Some("a".into()),
Some(1),
Some(10),
Some(true),
Some(b"x".to_vec()),
),
(
Some("a".into()),
Some(1),
Some(10),
Some(true),
Some(b"x".to_vec()),
),
(None, None, None, None, None),
(None, None, None, None, None),
(
Some("b".into()),
Some(2),
Some(20),
Some(false),
Some(b"y".to_vec()),
),
];
assert_eq!(visitor.data, expected);
Ok(())
}
fn create_map_array(entries: Vec<Vec<(&str, Option<&str>)>>) -> MapArray {
let mut all_keys = vec![];
let mut all_values = vec![];
let mut offsets = vec![0i32];
for entry_group in entries {
for (key, value) in entry_group {
all_keys.push(Some(key));
all_values.push(value);
}
offsets.push(all_keys.len() as i32);
}
let keys_array =
Arc::new(StringArray::from(all_keys)) as Arc<dyn crate::arrow::array::Array>;
let values_array =
Arc::new(StringArray::from(all_values)) as Arc<dyn crate::arrow::array::Array>;
let entries_struct = StructArray::try_new(
vec![
Arc::new(ArrowField::new("keys", ArrowDataType::Utf8, false)),
Arc::new(ArrowField::new("values", ArrowDataType::Utf8, true)),
]
.into(),
vec![keys_array, values_array],
None,
)
.unwrap();
let offsets_buffer = OffsetBuffer::new(offsets.into());
MapArray::try_new(
Arc::new(ArrowField::new_struct(
"entries",
vec![
Arc::new(ArrowField::new("keys", ArrowDataType::Utf8, false)),
Arc::new(ArrowField::new("values", ArrowDataType::Utf8, true)),
],
false,
)),
offsets_buffer,
entries_struct,
None,
false,
)
.unwrap()
}
fn map_item_from<'a>(map: &'a MapArray, row: usize) -> MapItem<'a> {
let keys = super::as_string_accessor(map.keys().as_ref()).unwrap();
let values = super::as_string_accessor(map.values().as_ref()).unwrap();
let start = map.offsets()[row] as usize;
let end = map.offsets()[row + 1] as usize;
MapItem::new(keys, values, start..end)
}
#[test]
fn test_materialize_matches_get() -> DeltaResult<()> {
let map_array = create_map_array(vec![vec![
("key1", Some("value1")),
("key2", Some("value2")),
("key3", Some("value3")),
]]);
let item = map_item_from(&map_array, 0);
let materialized = item.materialize();
for (key, value) in &materialized {
let get_result = item.get(key);
assert_eq!(get_result, Some(value.as_str()));
}
assert_eq!(materialized.len(), 3);
Ok(())
}
#[test]
fn test_materialize_handles_nulls() -> DeltaResult<()> {
let map_array =
create_map_array(vec![vec![("a", Some("1")), ("b", None), ("c", Some("3"))]]);
let item = map_item_from(&map_array, 0);
let result = item.materialize();
assert_eq!(result.len(), 2);
assert_eq!(result.get("a"), Some(&"1".to_string()));
assert_eq!(result.get("b"), None);
assert_eq!(result.get("c"), Some(&"3".to_string()));
Ok(())
}
#[test]
fn test_materialize_empty_map() -> DeltaResult<()> {
let map_array = create_map_array(vec![vec![]]);
let item = map_item_from(&map_array, 0);
let result = item.materialize();
assert_eq!(result.len(), 0);
Ok(())
}
#[test]
fn test_materialize_multiple_rows() -> DeltaResult<()> {
let map_array = create_map_array(vec![
vec![("a", Some("1")), ("b", Some("2"))],
vec![("x", Some("10")), ("y", Some("20"))],
]);
let item0 = map_item_from(&map_array, 0);
let result0 = item0.materialize();
assert_eq!(result0.len(), 2);
assert_eq!(result0.get("a"), Some(&"1".to_string()));
assert_eq!(result0.get("b"), Some(&"2".to_string()));
let item1 = map_item_from(&map_array, 1);
let result1 = item1.materialize();
assert_eq!(result1.len(), 2);
assert_eq!(result1.get("x"), Some(&"10".to_string()));
assert_eq!(result1.get("y"), Some(&"20".to_string()));
Ok(())
}
#[test]
fn test_get_vs_materialize_consistency_with_duplicates() -> DeltaResult<()> {
let map_array = create_map_array(vec![vec![
("a", Some("1")),
("b", Some("2")),
("a", Some("3")), ("c", Some("4")),
("a", Some("5")), ]]);
let item = map_item_from(&map_array, 0);
let materialized = item.materialize();
assert_eq!(materialized.len(), 3); assert_eq!(materialized.get("a"), Some(&"5".to_string())); assert_eq!(materialized.get("b"), Some(&"2".to_string()));
assert_eq!(materialized.get("c"), Some(&"4".to_string()));
assert_eq!(item.get("a"), Some("5")); assert_eq!(item.get("b"), Some("2"));
assert_eq!(item.get("c"), Some("4"));
Ok(())
}
#[test]
fn test_materialize_null_map() -> DeltaResult<()> {
let keys_array = Arc::new(StringArray::from(vec![
Some("a"),
Some("b"), Some("c"), Some("d"), ])) as Arc<dyn crate::arrow::array::Array>;
let values_array = Arc::new(StringArray::from(vec![
Some("1"),
Some("2"), Some("3"), Some("4"), ])) as Arc<dyn crate::arrow::array::Array>;
let entries_struct = StructArray::try_new(
vec![
Arc::new(ArrowField::new("keys", ArrowDataType::Utf8, false)),
Arc::new(ArrowField::new("values", ArrowDataType::Utf8, true)),
]
.into(),
vec![keys_array, values_array],
None,
)
.unwrap();
let offsets_buffer = OffsetBuffer::new(vec![0i32, 2, 3, 4].into());
let null_buffer = Some(crate::arrow::buffer::NullBuffer::from(vec![
true, false, true,
]));
let map_array = MapArray::try_new(
Arc::new(ArrowField::new_struct(
"entries",
vec![
Arc::new(ArrowField::new("keys", ArrowDataType::Utf8, false)),
Arc::new(ArrowField::new("values", ArrowDataType::Utf8, true)),
],
false,
)),
offsets_buffer,
entries_struct,
null_buffer,
false,
)
.unwrap();
let item0 = map_item_from(&map_array, 0);
let result0 = item0.materialize();
assert_eq!(result0.len(), 2);
assert_eq!(result0.get("a"), Some(&"1".to_string()));
assert_eq!(result0.get("b"), Some(&"2".to_string()));
let map_item_1: Option<MapItem<'_>> = map_array.get_map(1, "test")?;
assert!(map_item_1.is_none());
let item2 = map_item_from(&map_array, 2);
let result2 = item2.materialize();
assert_eq!(result2.len(), 1);
assert_eq!(result2.get("d"), Some(&"4".to_string()));
Ok(())
}
#[rstest]
#[case::utf8(Arc::new(StringArray::from(vec![Some("alice"), None, Some("charlie")])) as ArrayRef)]
#[case::large_utf8(Arc::new(LargeStringArray::from(vec![Some("alice"), None, Some("charlie")])) as ArrayRef)]
#[case::utf8_view(Arc::new(StringViewArray::from(vec![Some("alice"), None, Some("charlie")])) as ArrayRef)]
fn test_visit_rows_string_types(#[case] values: ArrayRef) -> DeltaResult<()> {
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"name",
values.data_type().clone(),
true,
)])),
vec![values],
)?;
let arrow_data = ArrowEngineData::new(batch);
struct Visitor {
values: Vec<Option<String>>,
}
impl RowVisitor for Visitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["name"])]);
static TYPES: &[DataType] = &[DataType::STRING];
(&NAMES, TYPES)
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
for i in 0..row_count {
self.values
.push(getters[0].get_str(i, "name")?.map(|s| s.to_string()));
}
Ok(())
}
}
let mut visitor = Visitor { values: vec![] };
arrow_data.visit_rows(&[ColumnName::new(["name"])], &mut visitor)?;
assert_eq!(
visitor.values,
vec![Some("alice".into()), None, Some("charlie".into())]
);
Ok(())
}
#[test]
fn test_visit_rows_large_binary() -> DeltaResult<()> {
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"data",
ArrowDataType::LargeBinary,
true,
)])),
vec![Arc::new(LargeBinaryArray::from(vec![
Some(b"hello" as &[u8]),
None,
Some(b"\x00\x01"),
]))],
)?;
let arrow_data = ArrowEngineData::new(batch);
struct Visitor {
values: Vec<Option<Vec<u8>>>,
}
impl RowVisitor for Visitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["data"])]);
static TYPES: &[DataType] = &[DataType::BINARY];
(&NAMES, TYPES)
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
for i in 0..row_count {
self.values
.push(getters[0].get_binary(i, "data")?.map(|b| b.to_vec()));
}
Ok(())
}
}
let mut visitor = Visitor { values: vec![] };
arrow_data.visit_rows(&[ColumnName::new(["data"])], &mut visitor)?;
assert_eq!(
visitor.values,
vec![Some(b"hello".to_vec()), None, Some(b"\x00\x01".to_vec())]
);
Ok(())
}
#[test]
fn test_visit_rows_list_view() -> DeltaResult<()> {
let values = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
let field = Arc::new(ArrowField::new("item", ArrowDataType::Utf8, false));
let offsets = ScalarBuffer::from(vec![0i32, 2]);
let sizes = ScalarBuffer::from(vec![2i32, 1]);
let list_view = ListViewArray::new(field.clone(), offsets, sizes, values, None);
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"tags",
list_view.data_type().clone(),
false,
)])),
vec![Arc::new(list_view)],
)?;
let arrow_data = ArrowEngineData::new(batch);
struct Visitor {
values: Vec<Vec<String>>,
}
impl RowVisitor for Visitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["tags"])]);
static TYPES: LazyLock<Vec<DataType>> =
LazyLock::new(|| vec![ArrayType::new(DataType::STRING, false).into()]);
(&NAMES, &TYPES)
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
for i in 0..row_count {
let list: ListItem<'_> = getters[0].get(i, "tags")?;
self.values.push(list.materialize());
}
Ok(())
}
}
let mut visitor = Visitor { values: vec![] };
arrow_data.visit_rows(&[ColumnName::new(["tags"])], &mut visitor)?;
assert_eq!(
visitor.values,
vec![
vec!["a".to_string(), "b".to_string()],
vec!["c".to_string()]
]
);
Ok(())
}
}