#![deny(missing_docs)]
use std::sync::Arc;
use marrow::{datatypes::Field, error::MarrowError, view::View};
use serde::{Deserialize, Serialize};
use crate::{
_impl::arrow::{
array::{Array, ArrayRef, RecordBatch},
datatypes::{Field as ArrowField, FieldRef, Schema},
},
internal::{
array_builder::ArrayBuilder,
deserializer::Deserializer,
error::{fail, Error, ErrorKind, Result},
schema::extensions::{Bool8Field, FixedShapeTensorField, VariableShapeTensorField},
schema::{SchemaLike, Sealed, SerdeArrowSchema, TracingOptions},
serializer::Serializer,
},
};
pub fn to_arrow<T: Serialize>(fields: &[FieldRef], items: T) -> Result<Vec<ArrayRef>> {
let builder = ArrayBuilder::from_arrow(fields)?;
items
.serialize(Serializer::new(builder))?
.into_inner()
.into_arrow()
}
pub fn from_arrow<'de, T, A>(fields: &[FieldRef], arrays: &'de [A]) -> Result<T>
where
T: Deserialize<'de>,
A: AsRef<dyn Array>,
{
T::deserialize(Deserializer::from_arrow(fields, arrays)?)
}
pub fn to_record_batch<T: Serialize>(fields: &[FieldRef], items: &T) -> Result<RecordBatch> {
let builder = ArrayBuilder::from_arrow(fields)?;
items
.serialize(Serializer::new(builder))?
.into_inner()
.into_record_batch()
}
pub fn from_record_batch<'de, T: Deserialize<'de>>(record_batch: &'de RecordBatch) -> Result<T> {
T::deserialize(Deserializer::from_record_batch(record_batch)?)
}
impl crate::internal::array_builder::ArrayBuilder {
pub fn from_arrow(fields: &[FieldRef]) -> Result<Self> {
let fields = fields_from_field_refs(fields)?;
Self::new(SerdeArrowSchema { fields })
}
pub fn to_arrow(&mut self) -> Result<Vec<ArrayRef>> {
self.take().into_arrow()
}
pub fn into_arrow(self) -> Result<Vec<ArrayRef>> {
let (arrays, _) = self.into_arrays_and_field_metas()?;
Ok(arrays
.into_iter()
.map(ArrayRef::try_from)
.collect::<Result<_, MarrowError>>()?)
}
pub fn to_record_batch(&mut self) -> Result<RecordBatch> {
self.take().into_record_batch()
}
pub fn into_record_batch(self) -> Result<RecordBatch> {
let (arrays, metas) = self.into_arrays_and_field_metas()?;
let arrays = arrays
.into_iter()
.map(ArrayRef::try_from)
.collect::<Result<Vec<ArrayRef>, MarrowError>>()?;
let mut fields = Vec::with_capacity(arrays.len());
for (array, meta) in std::iter::zip(&arrays, metas) {
fields.push(FieldRef::new(
ArrowField::new(meta.name, array.data_type().clone(), meta.nullable)
.with_metadata(meta.metadata),
));
}
let schema = Schema::new(fields);
RecordBatch::try_new(Arc::new(schema), arrays)
.map_err(|err| Error::new_from(ErrorKind::Custom, err.to_string(), err))
}
}
impl<'de> Deserializer<'de> {
pub fn from_arrow<A>(fields: &[FieldRef], arrays: &'de [A]) -> Result<Self>
where
A: AsRef<dyn Array>,
{
if fields.len() != arrays.len() {
fail!(
"different number of fields ({}) and arrays ({})",
fields.len(),
arrays.len()
);
}
let fields = fields_from_field_refs(fields)?;
let mut views = Vec::new();
for array in arrays {
views.push(View::try_from(array.as_ref())?);
}
Deserializer::new(&fields, views)
}
pub fn from_record_batch(record_batch: &'de RecordBatch) -> Result<Self> {
let schema = record_batch.schema();
Deserializer::from_arrow(schema.fields(), record_batch.columns())
}
}
fn fields_from_field_refs(fields: &[FieldRef]) -> Result<Vec<Field>> {
Ok(fields
.iter()
.map(|field| Field::try_from(field.as_ref()))
.collect::<Result<_, MarrowError>>()?)
}
impl TryFrom<SerdeArrowSchema> for Vec<ArrowField> {
type Error = Error;
fn try_from(value: SerdeArrowSchema) -> Result<Self> {
(&value).try_into()
}
}
impl<'a> TryFrom<&'a SerdeArrowSchema> for Vec<ArrowField> {
type Error = Error;
fn try_from(value: &'a SerdeArrowSchema) -> Result<Self> {
Ok(value
.fields
.iter()
.map(ArrowField::try_from)
.collect::<Result<_, MarrowError>>()?)
}
}
impl TryFrom<SerdeArrowSchema> for Vec<FieldRef> {
type Error = Error;
fn try_from(value: SerdeArrowSchema) -> Result<Self> {
(&value).try_into()
}
}
impl<'a> TryFrom<&'a SerdeArrowSchema> for Vec<FieldRef> {
type Error = Error;
fn try_from(value: &'a SerdeArrowSchema) -> Result<Self> {
Ok(value
.fields
.iter()
.map(|f| Ok(Arc::new(ArrowField::try_from(f)?)))
.collect::<Result<_, MarrowError>>()?)
}
}
impl<'a> TryFrom<&'a [ArrowField]> for SerdeArrowSchema {
type Error = Error;
fn try_from(fields: &'a [ArrowField]) -> Result<Self> {
Ok(Self {
fields: fields
.iter()
.map(Field::try_from)
.collect::<Result<_, MarrowError>>()?,
})
}
}
impl<'a> TryFrom<&'a [FieldRef]> for SerdeArrowSchema {
type Error = Error;
fn try_from(fields: &'a [FieldRef]) -> Result<Self, Self::Error> {
Ok(Self {
fields: fields
.iter()
.map(|f| Field::try_from(f.as_ref()))
.collect::<Result<_, MarrowError>>()?,
})
}
}
impl Sealed for Vec<ArrowField> {}
impl SchemaLike for Vec<ArrowField> {
fn from_value<T: Serialize>(value: T) -> Result<Self> {
SerdeArrowSchema::from_value(value)?.try_into()
}
fn from_type<'de, T: Deserialize<'de>>(options: TracingOptions) -> Result<Self> {
SerdeArrowSchema::from_type::<T>(options)?.try_into()
}
fn from_samples<T: Serialize>(samples: T, options: TracingOptions) -> Result<Self> {
SerdeArrowSchema::from_samples(samples, options)?.try_into()
}
}
impl Sealed for Vec<FieldRef> {}
impl SchemaLike for Vec<FieldRef> {
fn from_value<T: Serialize>(value: T) -> Result<Self> {
SerdeArrowSchema::from_value(value)?.try_into()
}
fn from_type<'de, T: Deserialize<'de>>(options: TracingOptions) -> Result<Self> {
SerdeArrowSchema::from_type::<T>(options)?.try_into()
}
fn from_samples<T: Serialize>(samples: T, options: TracingOptions) -> Result<Self> {
SerdeArrowSchema::from_samples(samples, options)?.try_into()
}
}
macro_rules! impl_try_from_ext_type {
($ty:ty) => {
impl TryFrom<&$ty> for ArrowField {
type Error = Error;
fn try_from(value: &$ty) -> Result<Self, Self::Error> {
Ok(Self::try_from(&Field::try_from(value)?)?)
}
}
impl TryFrom<$ty> for ArrowField {
type Error = Error;
fn try_from(value: $ty) -> Result<Self, Self::Error> {
Self::try_from(&value)
}
}
};
}
impl_try_from_ext_type!(Bool8Field);
impl_try_from_ext_type!(FixedShapeTensorField);
impl_try_from_ext_type!(VariableShapeTensorField);