pub use parquet_variant::*;
pub use parquet_variant_compute::*;
#[cfg(test)]
mod tests {
use crate::arrow::ArrowWriter;
use crate::arrow::arrow_reader::ArrowReaderBuilder;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::ChunkReader;
use arrow::util::test_util::parquet_test_data;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::Schema;
use bytes::Bytes;
use parquet_variant::{Variant, VariantBuilderExt};
use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType};
use std::path::PathBuf;
use std::sync::Arc;
#[test]
fn roundtrip_basic() {
roundtrip(variant_array());
}
#[test]
fn read_logical_type() {
let batch = read_shredded_variant_test_case("case-075.parquet");
assert_variant_metadata(&batch, "var");
let var_column = batch.column_by_name("var").expect("expected var column");
let var_array =
VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray");
assert_eq!(var_array.len(), 1);
assert!(var_array.is_valid(0));
let var_value = var_array.value(0);
assert_eq!(var_value, Variant::from("iceberg"));
}
#[test]
fn write_logical_type() {
let array = variant_array();
let batch = variant_array_to_batch(array);
let buffer = write_to_buffer(&batch);
let metadata = read_metadata(&Bytes::from(buffer));
let schema = metadata.file_metadata().schema_descr();
let fields = schema.root_schema().get_fields();
assert_eq!(fields.len(), 1);
let field = &fields[0];
assert_eq!(field.name(), "data");
assert_eq!(
field.get_basic_info().logical_type_ref(),
Some(&crate::basic::LogicalType::Variant {
specification_version: None
})
);
}
fn variant_array() -> VariantArray {
let mut builder = VariantArrayBuilder::new(3);
builder.new_object().with_field("name", "Alice").finish();
builder.append_value("such wow");
builder.append_null();
builder.build()
}
fn roundtrip(array: VariantArray) {
let source_batch = variant_array_to_batch(array);
assert_variant_metadata(&source_batch, "data");
let buffer = write_to_buffer(&source_batch);
let result_batch = read_to_batch(Bytes::from(buffer));
assert_variant_metadata(&result_batch, "data");
assert_eq!(result_batch, source_batch); }
fn variant_array_to_batch(array: VariantArray) -> RecordBatch {
let field = array.field("data");
let schema = Schema::new(vec![field]);
RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)]).unwrap()
}
fn write_to_buffer(batch: &RecordBatch) -> Vec<u8> {
let mut buffer = vec![];
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
writer.write(batch).unwrap();
writer.close().unwrap();
buffer
}
fn read_metadata<T: ChunkReader + 'static>(input: &T) -> ParquetMetaData {
let mut reader = ParquetMetaDataReader::new();
reader.try_parse(input).unwrap();
reader.finish().unwrap()
}
fn read_to_batch<T: ChunkReader + 'static>(reader: T) -> RecordBatch {
let reader = ArrowReaderBuilder::try_new(reader)
.unwrap()
.build()
.unwrap();
let mut batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 1);
batches.swap_remove(0)
}
fn assert_variant_metadata(batch: &RecordBatch, field_name: &str) {
let schema = batch.schema();
let field = schema
.field_with_name(field_name)
.expect("could not find expected field");
let metadata_value = field
.metadata()
.get("ARROW:extension:name")
.expect("metadata does not exist");
assert_eq!(metadata_value, "arrow.parquet.variant");
field
.try_extension_type::<VariantType>()
.expect("VariantExtensionType should be readable");
}
fn read_shredded_variant_test_case(name: &str) -> RecordBatch {
let case_file = PathBuf::from(parquet_test_data())
.join("..") .join("shredded_variant")
.join(name);
let case_file = std::fs::File::open(case_file).unwrap();
read_to_batch(case_file)
}
}