use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::avro::avro_schema::file::{Block, CompressedBlock, Compression};
use arrow2::io::avro::avro_schema::write::{compress, write_block, write_metadata};
use arrow2::io::avro::write;
use arrow2::types::months_days_ns;
use avro_schema::schema::{Field as AvroField, Record, Schema as AvroSchema};
use super::read::read_avro;
pub(super) fn schema() -> Schema {
Schema::from(vec![
Field::new("int64", DataType::Int64, false),
Field::new("int64 nullable", DataType::Int64, true),
Field::new("utf8", DataType::Utf8, false),
Field::new("utf8 nullable", DataType::Utf8, true),
Field::new("int32", DataType::Int32, false),
Field::new("int32 nullable", DataType::Int32, true),
Field::new("date", DataType::Date32, false),
Field::new("date nullable", DataType::Date32, true),
Field::new("binary", DataType::Binary, false),
Field::new("binary nullable", DataType::Binary, true),
Field::new("fs binary", DataType::FixedSizeBinary(3), false),
Field::new("fs binary nullable", DataType::FixedSizeBinary(3), true),
Field::new("float32", DataType::Float32, false),
Field::new("float32 nullable", DataType::Float32, true),
Field::new("float64", DataType::Float64, false),
Field::new("float64 nullable", DataType::Float64, true),
Field::new("boolean", DataType::Boolean, false),
Field::new("boolean nullable", DataType::Boolean, true),
Field::new(
"interval",
DataType::Interval(IntervalUnit::MonthDayNano),
false,
),
Field::new(
"interval nullable",
DataType::Interval(IntervalUnit::MonthDayNano),
true,
),
Field::new(
"list",
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
false,
),
Field::new(
"list nullable",
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
),
])
}
pub(super) fn data() -> Chunk<Box<dyn Array>> {
let list_dt = DataType::List(Box::new(Field::new("item", DataType::Int32, true)));
let list_dt1 = DataType::List(Box::new(Field::new("item", DataType::Int32, true)));
let columns = vec![
Box::new(Int64Array::from_slice([27, 47])) as Box<dyn Array>,
Box::new(Int64Array::from([Some(27), None])),
Box::new(Utf8Array::<i32>::from_slice(["foo", "bar"])),
Box::new(Utf8Array::<i32>::from([Some("foo"), None])),
Box::new(Int32Array::from_slice([1, 1])),
Box::new(Int32Array::from([Some(1), None])),
Box::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)),
Box::new(Int32Array::from([Some(1), None]).to(DataType::Date32)),
Box::new(BinaryArray::<i32>::from_slice([b"foo", b"bar"])),
Box::new(BinaryArray::<i32>::from([Some(b"foo"), None])),
Box::new(FixedSizeBinaryArray::from_slice([[1, 2, 3], [1, 2, 3]])),
Box::new(FixedSizeBinaryArray::from([Some([1, 2, 3]), None])),
Box::new(PrimitiveArray::<f32>::from_slice([1.0, 2.0])),
Box::new(PrimitiveArray::<f32>::from([Some(1.0), None])),
Box::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0])),
Box::new(PrimitiveArray::<f64>::from([Some(1.0), None])),
Box::new(BooleanArray::from_slice([true, false])),
Box::new(BooleanArray::from([Some(true), None])),
Box::new(PrimitiveArray::<months_days_ns>::from_slice([
months_days_ns::new(1, 1, 10 * 1_000_000), months_days_ns::new(2, 2, 20 * 1_000_000), ])),
Box::new(PrimitiveArray::<months_days_ns>::from([
Some(months_days_ns::new(1, 1, 10 * 1_000_000)), None,
])),
Box::new(ListArray::<i32>::new(
list_dt,
vec![0, 2, 5].try_into().unwrap(),
Box::new(PrimitiveArray::<i32>::from([
None,
Some(1),
None,
Some(3),
Some(4),
])),
None,
)),
Box::new(ListArray::<i32>::new(
list_dt1,
vec![0, 2, 2].try_into().unwrap(),
Box::new(PrimitiveArray::<i32>::from([None, Some(1)])),
Some([true, false].into()),
)),
];
Chunk::new(columns)
}
pub(super) fn serialize_to_block<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<Compression>,
) -> Result<CompressedBlock> {
let record = write::to_record(schema)?;
let mut serializers = columns
.arrays()
.iter()
.map(|x| x.as_ref())
.zip(record.fields.iter())
.map(|(array, field)| write::new_serializer(array, &field.schema))
.collect::<Vec<_>>();
let mut block = Block::new(columns.len(), vec![]);
write::serialize(&mut serializers, &mut block);
let mut compressed_block = CompressedBlock::default();
compress(&mut block, &mut compressed_block, compression)?;
Ok(compressed_block)
}
fn write_avro<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<Compression>,
) -> Result<Vec<u8>> {
let compressed_block = serialize_to_block(columns, schema, compression)?;
let avro_fields = write::to_record(schema)?;
let mut file = vec![];
write_metadata(&mut file, avro_fields, compression)?;
write_block(&mut file, &compressed_block)?;
Ok(file)
}
fn roundtrip(compression: Option<Compression>) -> Result<()> {
let expected = data();
let expected_schema = schema();
let data = write_avro(&expected, &expected_schema, compression)?;
let (result, read_schema) = read_avro(&data, None)?;
assert_eq!(expected_schema, read_schema);
for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) {
assert_eq!(c1.as_ref(), c2.as_ref());
}
Ok(())
}
#[test]
fn no_compression() -> Result<()> {
roundtrip(None)
}
#[cfg(feature = "io_avro_compression")]
#[test]
fn snappy() -> Result<()> {
roundtrip(Some(Compression::Snappy))
}
#[cfg(feature = "io_avro_compression")]
#[test]
fn deflate() -> Result<()> {
roundtrip(Some(Compression::Deflate))
}
fn large_format_schema() -> Schema {
Schema::from(vec![
Field::new("large_utf8", DataType::LargeUtf8, false),
Field::new("large_utf8_nullable", DataType::LargeUtf8, true),
Field::new("large_binary", DataType::LargeBinary, false),
Field::new("large_binary_nullable", DataType::LargeBinary, true),
])
}
fn large_format_data() -> Chunk<Box<dyn Array>> {
let columns = vec![
Box::new(Utf8Array::<i64>::from_slice(["a", "b"])) as Box<dyn Array>,
Box::new(Utf8Array::<i64>::from([Some("a"), None])),
Box::new(BinaryArray::<i64>::from_slice([b"foo", b"bar"])),
Box::new(BinaryArray::<i64>::from([Some(b"foo"), None])),
];
Chunk::new(columns)
}
fn large_format_expected_schema() -> Schema {
Schema::from(vec![
Field::new("large_utf8", DataType::Utf8, false),
Field::new("large_utf8_nullable", DataType::Utf8, true),
Field::new("large_binary", DataType::Binary, false),
Field::new("large_binary_nullable", DataType::Binary, true),
])
}
fn large_format_expected_data() -> Chunk<Box<dyn Array>> {
let columns = vec![
Box::new(Utf8Array::<i32>::from_slice(["a", "b"])) as Box<dyn Array>,
Box::new(Utf8Array::<i32>::from([Some("a"), None])),
Box::new(BinaryArray::<i32>::from_slice([b"foo", b"bar"])),
Box::new(BinaryArray::<i32>::from([Some(b"foo"), None])),
];
Chunk::new(columns)
}
#[test]
fn check_large_format() -> Result<()> {
let write_schema = large_format_schema();
let write_data = large_format_data();
let data = write_avro(&write_data, &write_schema, None)?;
let (result, read_schame) = read_avro(&data, None)?;
let expected_schema = large_format_expected_schema();
assert_eq!(read_schame, expected_schema);
let expected_data = large_format_expected_data();
for (c1, c2) in result.columns().iter().zip(expected_data.columns().iter()) {
assert_eq!(c1.as_ref(), c2.as_ref());
}
Ok(())
}
fn struct_schema() -> Schema {
Schema::from(vec![
Field::new(
"struct",
DataType::Struct(vec![
Field::new("item1", DataType::Int32, false),
Field::new("item2", DataType::Int32, true),
]),
false,
),
Field::new(
"struct nullable",
DataType::Struct(vec![
Field::new("item1", DataType::Int32, false),
Field::new("item2", DataType::Int32, true),
]),
true,
),
])
}
fn struct_data() -> Chunk<Box<dyn Array>> {
let struct_dt = DataType::Struct(vec![
Field::new("item1", DataType::Int32, false),
Field::new("item2", DataType::Int32, true),
]);
Chunk::new(vec![
Box::new(StructArray::new(
struct_dt.clone(),
vec![
Box::new(PrimitiveArray::<i32>::from_slice([1, 2])),
Box::new(PrimitiveArray::<i32>::from([None, Some(1)])),
],
None,
)),
Box::new(StructArray::new(
struct_dt,
vec![
Box::new(PrimitiveArray::<i32>::from_slice([1, 2])),
Box::new(PrimitiveArray::<i32>::from([None, Some(1)])),
],
Some([true, false].into()),
)),
])
}
fn avro_record() -> Record {
Record {
name: "".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "struct".to_string(),
doc: None,
schema: AvroSchema::Record(Record {
name: "r1".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "item1".to_string(),
doc: None,
schema: AvroSchema::Int(None),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "item2".to_string(),
doc: None,
schema: AvroSchema::Union(vec![
AvroSchema::Null,
AvroSchema::Int(None),
]),
default: None,
order: None,
aliases: vec![],
},
],
}),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "struct nullable".to_string(),
doc: None,
schema: AvroSchema::Union(vec![
AvroSchema::Null,
AvroSchema::Record(Record {
name: "r2".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "item1".to_string(),
doc: None,
schema: AvroSchema::Int(None),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "item2".to_string(),
doc: None,
schema: AvroSchema::Union(vec![
AvroSchema::Null,
AvroSchema::Int(None),
]),
default: None,
order: None,
aliases: vec![],
},
],
}),
]),
default: None,
order: None,
aliases: vec![],
},
],
}
}
#[test]
fn avro_record_schema() -> Result<()> {
let arrow_schema = struct_schema();
let record = write::to_record(&arrow_schema)?;
assert_eq!(record, avro_record());
Ok(())
}
#[test]
fn struct_() -> Result<()> {
let write_schema = struct_schema();
let write_data = struct_data();
let data = write_avro(&write_data, &write_schema, None)?;
let (result, read_schema) = read_avro(&data, None)?;
let expected_schema = struct_schema();
assert_eq!(read_schema, expected_schema);
let expected_data = struct_data();
for (c1, c2) in result.columns().iter().zip(expected_data.columns().iter()) {
assert_eq!(c1.as_ref(), c2.as_ref());
}
Ok(())
}