use std::convert::TryInto;
use std::sync::Arc;
use avro_rs::Schema as AvroSchema;
use crate::array::*;
use crate::datatypes::*;
use crate::error::ArrowError;
use crate::error::Result;
use crate::record_batch::RecordBatch;
use crate::types::months_days_ns;
use super::nested::*;
use super::util;
fn make_mutable(
data_type: &DataType,
avro_schema: Option<&AvroSchema>,
capacity: usize,
) -> Result<Box<dyn MutableArray>> {
Ok(match data_type.to_physical_type() {
PhysicalType::Boolean => {
Box::new(MutableBooleanArray::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Box::new(MutablePrimitiveArray::<$T>::with_capacity(capacity).to(data_type.clone()))
as Box<dyn MutableArray>
}),
PhysicalType::Binary => {
Box::new(MutableBinaryArray::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Utf8 => {
Box::new(MutableUtf8Array::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Dictionary(_) => {
if let Some(AvroSchema::Enum { symbols, .. }) = avro_schema {
let values = Utf8Array::<i32>::from_slice(symbols);
Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity))
as Box<dyn MutableArray>
} else {
unreachable!()
}
}
_ => match data_type {
DataType::List(inner) => {
let values = make_mutable(inner.data_type(), None, 0)?;
Box::new(DynMutableListArray::<i32>::new_with_capacity(
values, capacity,
)) as Box<dyn MutableArray>
}
DataType::FixedSizeBinary(size) => Box::new(MutableFixedSizeBinaryArray::with_capacity(
*size as usize,
capacity,
)) as Box<dyn MutableArray>,
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing type {:?} is still not implemented",
other
)))
}
},
})
}
#[inline]
fn deserialize_item<'a>(
array: &mut dyn MutableArray,
is_nullable: bool,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
if is_nullable {
if util::zigzag_i64(&mut block)? == 0 {
array.push_null();
return Ok(block);
}
}
let data_type = array.data_type();
match data_type {
DataType::List(inner) => {
let is_nullable = inner.is_nullable();
let array = array
.as_mut_any()
.downcast_mut::<DynMutableListArray<i32>>()
.unwrap();
loop {
let len = util::zigzag_i64(&mut block)? as usize;
if len == 0 {
break;
}
let values = array.mut_values();
for _ in 0..len {
block = deserialize_item(values, is_nullable, block)?;
}
array.try_push_valid()?;
}
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
let data = &block[..12];
block = &block[12..];
let value = months_days_ns::new(
i32::from_le_bytes([data[0], data[1], data[2], data[3]]),
i32::from_le_bytes([data[4], data[5], data[6], data[7]]),
i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64 * 1_000_000,
);
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<months_days_ns>>()
.unwrap();
array.push(Some(value))
}
_ => match data_type.to_physical_type() {
PhysicalType::Boolean => {
let is_valid = block[0] == 1;
block = &block[1..];
let array = array
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
array.push(Some(is_valid))
}
PhysicalType::Primitive(primitive) => match primitive {
PrimitiveType::Int32 => {
let value = util::zigzag_i64(&mut block)? as i32;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
array.push(Some(value))
}
PrimitiveType::Int64 => {
let value = util::zigzag_i64(&mut block)? as i64;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i64>>()
.unwrap();
array.push(Some(value))
}
PrimitiveType::Float32 => {
let value =
f32::from_le_bytes(block[..std::mem::size_of::<f32>()].try_into().unwrap());
block = &block[std::mem::size_of::<f32>()..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f32>>()
.unwrap();
array.push(Some(value))
}
PrimitiveType::Float64 => {
let value =
f64::from_le_bytes(block[..std::mem::size_of::<f64>()].try_into().unwrap());
block = &block[std::mem::size_of::<f64>()..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f64>>()
.unwrap();
array.push(Some(value))
}
_ => unreachable!(),
},
PhysicalType::Utf8 => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = simdutf8::basic::from_utf8(&block[..len])?;
block = &block[len..];
let array = array
.as_mut_any()
.downcast_mut::<MutableUtf8Array<i32>>()
.unwrap();
array.push(Some(data))
}
PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = &block[..len];
block = &block[len..];
let array = array
.as_mut_any()
.downcast_mut::<MutableBinaryArray<i32>>()
.unwrap();
array.push(Some(data));
}
PhysicalType::FixedSizeBinary => {
let array = array
.as_mut_any()
.downcast_mut::<MutableFixedSizeBinaryArray>()
.unwrap();
let len = array.size();
let data = &block[..len];
block = &block[len..];
array.push(Some(data));
}
PhysicalType::Dictionary(_) => {
let index = util::zigzag_i64(&mut block)? as i32;
let array = array
.as_mut_any()
.downcast_mut::<FixedItemsUtf8Dictionary>()
.unwrap();
array.push_valid(index);
}
_ => todo!(),
},
};
Ok(block)
}
pub fn deserialize(
mut block: &[u8],
rows: usize,
schema: Arc<Schema>,
avro_schemas: &[AvroSchema],
) -> Result<RecordBatch> {
let mut arrays: Vec<Box<dyn MutableArray>> = schema
.fields()
.iter()
.zip(avro_schemas.iter())
.map(|(field, avro_schema)| {
let data_type = field.data_type().to_logical_type();
make_mutable(data_type, Some(avro_schema), rows)
})
.collect::<Result<_>>()?;
for _ in 0..rows {
for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) {
block = deserialize_item(array.as_mut(), field.is_nullable(), block)?
}
}
let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();
RecordBatch::try_new(schema, columns)
}