use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::types::NativeType;
use arrow2::{array::Array, buffer::Buffer};
use chrono::{NaiveDate, NaiveDateTime};
use std::sync::Arc;
use crate::field::*;
pub trait ArrowSerialize: ArrowField {
type MutableArrayType: arrow2::array::MutableArray;
fn new_array() -> Self::MutableArrayType;
fn arrow_serialize(
v: &<Self as ArrowField>::Type,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()>;
}
macro_rules! impl_numeric_type {
($physical_type:ty) => {
impl ArrowSerialize for $physical_type {
type MutableArrayType = MutablePrimitiveArray<$physical_type>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(
v: &Self,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
array.try_push(Some(*v))
}
}
};
}
impl<T> ArrowSerialize for Option<T>
where
T: ArrowSerialize,
{
type MutableArrayType = <T as ArrowSerialize>::MutableArrayType;
#[inline]
fn new_array() -> Self::MutableArrayType {
<T as ArrowSerialize>::new_array()
}
#[inline]
fn arrow_serialize(
v: &<Self as ArrowField>::Type,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
match v.as_ref() {
Some(t) => <T as ArrowSerialize>::arrow_serialize(t, array),
None => {
array.push_null();
Ok(())
}
}
}
}
impl_numeric_type!(u8);
impl_numeric_type!(u16);
impl_numeric_type!(u32);
impl_numeric_type!(u64);
impl_numeric_type!(i8);
impl_numeric_type!(i16);
impl_numeric_type!(i32);
impl_numeric_type!(i64);
impl_numeric_type!(arrow2::types::f16);
impl_numeric_type!(f32);
impl_numeric_type!(f64);
impl<const PRECISION: usize, const SCALE: usize> ArrowSerialize for I128<PRECISION, SCALE> {
type MutableArrayType = MutablePrimitiveArray<i128>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(v: &i128, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(*v))
}
}
impl ArrowSerialize for String {
type MutableArrayType = MutableUtf8Array<i32>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(v))
}
}
impl ArrowSerialize for LargeString {
type MutableArrayType = MutableUtf8Array<i64>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(
v: &String,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
array.try_push(Some(v))
}
}
impl ArrowSerialize for bool {
type MutableArrayType = MutableBooleanArray;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(*v))
}
}
impl ArrowSerialize for NaiveDateTime {
type MutableArrayType = MutablePrimitiveArray<i64>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::from(<Self as ArrowField>::data_type())
}
#[inline]
fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(v.timestamp_nanos()))
}
}
impl ArrowSerialize for NaiveDate {
type MutableArrayType = MutablePrimitiveArray<i32>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::from(<Self as ArrowField>::data_type())
}
#[inline]
fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(
chrono::Datelike::num_days_from_ce(v)
- arrow2::temporal_conversions::EPOCH_DAYS_FROM_CE,
))
}
}
impl ArrowSerialize for Buffer<u8> {
type MutableArrayType = MutableBinaryArray<i32>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(v.as_slice()))
}
}
impl ArrowSerialize for Vec<u8> {
type MutableArrayType = MutableBinaryArray<i32>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
array.try_push(Some(v))
}
}
impl ArrowSerialize for LargeBinary {
type MutableArrayType = MutableBinaryArray<i64>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::default()
}
#[inline]
fn arrow_serialize(
v: &Vec<u8>,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
array.try_push(Some(v))
}
}
impl<const SIZE: usize> ArrowSerialize for FixedSizeBinary<SIZE> {
type MutableArrayType = MutableFixedSizeBinaryArray;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::new(SIZE)
}
#[inline]
fn arrow_serialize(
v: &Vec<u8>,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
array.try_push(Some(v))
}
}
impl<T> ArrowSerialize for Buffer<T>
where
T: NativeType + ArrowSerialize + ArrowEnableVecForType,
{
type MutableArrayType = MutableListArray<i32, MutablePrimitiveArray<T>>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::new_with_field(
MutablePrimitiveArray::new(),
"item",
<T as ArrowField>::is_nullable(),
)
}
#[inline]
fn arrow_serialize(
v: &<Self as ArrowField>::Type,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
let values = array.mut_values();
values.reserve(v.len());
values.extend_from_slice(v.as_slice());
array.try_push_valid()
}
}
impl<T> ArrowSerialize for Vec<T>
where
T: ArrowSerialize + ArrowEnableVecForType + 'static,
<T as ArrowSerialize>::MutableArrayType: Default,
{
type MutableArrayType = MutableListArray<i32, <T as ArrowSerialize>::MutableArrayType>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::new_with_field(
<T as ArrowSerialize>::new_array(),
"item",
<T as ArrowField>::is_nullable(),
)
}
fn arrow_serialize(
v: &<Self as ArrowField>::Type,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
let values = array.mut_values();
for i in v.iter() {
<T as ArrowSerialize>::arrow_serialize(i, values)?;
}
array.try_push_valid()
}
}
impl<T> ArrowSerialize for LargeVec<T>
where
T: ArrowSerialize + ArrowEnableVecForType + 'static,
<T as ArrowSerialize>::MutableArrayType: Default,
{
type MutableArrayType = MutableListArray<i64, <T as ArrowSerialize>::MutableArrayType>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::new_with_field(
<T as ArrowSerialize>::new_array(),
"item",
<T as ArrowField>::is_nullable(),
)
}
fn arrow_serialize(
v: &<Self as ArrowField>::Type,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
let values = array.mut_values();
for i in v.iter() {
<T as ArrowSerialize>::arrow_serialize(i, values)?;
}
array.try_push_valid()
}
}
impl<T, const SIZE: usize> ArrowSerialize for FixedSizeVec<T, SIZE>
where
T: ArrowSerialize + ArrowEnableVecForType + 'static,
<T as ArrowSerialize>::MutableArrayType: Default,
{
type MutableArrayType = MutableFixedSizeListArray<<T as ArrowSerialize>::MutableArrayType>;
#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::new_with_field(
<T as ArrowSerialize>::new_array(),
"item",
<T as ArrowField>::is_nullable(),
SIZE,
)
}
fn arrow_serialize(
v: &<Self as ArrowField>::Type,
array: &mut Self::MutableArrayType,
) -> arrow2::error::Result<()> {
let values = array.mut_values();
for i in v.iter() {
<T as ArrowSerialize>::arrow_serialize(i, values)?;
}
array.try_push_valid()
}
}
fn arrow_serialize_extend_internal<
'a,
A: 'static,
T: ArrowSerialize + ArrowField<Type = A> + 'static,
I: IntoIterator<Item = &'a A>,
>(
into_iter: I,
array: &mut <T as ArrowSerialize>::MutableArrayType,
) -> arrow2::error::Result<()> {
let iter = into_iter.into_iter();
array.reserve(iter.size_hint().0);
for i in iter {
<T as ArrowSerialize>::arrow_serialize(i, array)?;
}
Ok(())
}
pub fn arrow_serialize_to_mutable_array<
'a,
A: 'static,
T: ArrowSerialize + ArrowField<Type = A> + 'static,
I: IntoIterator<Item = &'a A>,
>(
into_iter: I,
) -> arrow2::error::Result<<T as ArrowSerialize>::MutableArrayType> {
let mut arr = <T as ArrowSerialize>::new_array();
arrow_serialize_extend_internal::<A, T, I>(into_iter, &mut arr)?;
Ok(arr)
}
pub trait FlattenChunk {
fn flatten(self) -> Result<Chunk<Box<dyn Array>>, arrow2::error::Error>;
}
impl<A> FlattenChunk for Chunk<A>
where
A: AsRef<dyn Array>,
{
fn flatten(self) -> Result<Chunk<Box<dyn Array>>, arrow2::error::Error> {
let arrays = self.into_arrays();
if arrays.len() != 1 {
return Err(arrow2::error::Error::InvalidArgumentError(
"Chunk must contain a single Array".to_string(),
));
}
let array = &arrays[0];
let physical_type = array.as_ref().data_type().to_physical_type();
if physical_type != arrow2::datatypes::PhysicalType::Struct {
return Err(arrow2::error::Error::InvalidArgumentError(
"Array in Chunk must be of type arrow2::datatypes::PhysicalType::Struct"
.to_string(),
));
}
let struct_array = array
.as_ref()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
Ok(Chunk::new(struct_array.values().to_vec()))
}
}
pub trait TryIntoArrow<'a, ArrowArray, Element>
where
Self: IntoIterator<Item = &'a Element>,
Element: 'static,
{
fn try_into_arrow(self) -> arrow2::error::Result<ArrowArray>;
fn try_into_arrow_as_type<ArrowType>(self) -> arrow2::error::Result<ArrowArray>
where
ArrowType: ArrowSerialize + ArrowField<Type = Element> + 'static;
}
impl<'a, Element, Collection> TryIntoArrow<'a, Arc<dyn Array>, Element> for Collection
where
Element: ArrowSerialize + ArrowField<Type = Element> + 'static,
Collection: IntoIterator<Item = &'a Element>,
{
fn try_into_arrow(self) -> arrow2::error::Result<Arc<dyn Array>> {
Ok(arrow_serialize_to_mutable_array::<Element, Element, Collection>(self)?.as_arc())
}
fn try_into_arrow_as_type<Field>(self) -> arrow2::error::Result<Arc<dyn Array>>
where
Field: ArrowSerialize + ArrowField<Type = Element> + 'static,
{
Ok(arrow_serialize_to_mutable_array::<Element, Field, Collection>(self)?.as_arc())
}
}
impl<'a, Element, Collection> TryIntoArrow<'a, Box<dyn Array>, Element> for Collection
where
Element: ArrowSerialize + ArrowField<Type = Element> + 'static,
Collection: IntoIterator<Item = &'a Element>,
{
fn try_into_arrow(self) -> arrow2::error::Result<Box<dyn Array>> {
Ok(arrow_serialize_to_mutable_array::<Element, Element, Collection>(self)?.as_box())
}
fn try_into_arrow_as_type<E>(self) -> arrow2::error::Result<Box<dyn Array>>
where
E: ArrowSerialize + ArrowField<Type = Element> + 'static,
{
Ok(arrow_serialize_to_mutable_array::<Element, E, Collection>(self)?.as_box())
}
}
impl<'a, Element, Collection> TryIntoArrow<'a, Chunk<Arc<dyn Array>>, Element> for Collection
where
Element: ArrowSerialize + ArrowField<Type = Element> + 'static,
Collection: IntoIterator<Item = &'a Element>,
{
fn try_into_arrow(self) -> arrow2::error::Result<Chunk<Arc<dyn Array>>> {
Ok(Chunk::new(vec![arrow_serialize_to_mutable_array::<
Element,
Element,
Collection,
>(self)?
.as_arc()]))
}
fn try_into_arrow_as_type<Field>(self) -> arrow2::error::Result<Chunk<Arc<dyn Array>>>
where
Field: ArrowSerialize + ArrowField<Type = Element> + 'static,
{
Ok(Chunk::new(vec![arrow_serialize_to_mutable_array::<
Element,
Field,
Collection,
>(self)?
.as_arc()]))
}
}
impl<'a, Element, Collection> TryIntoArrow<'a, Chunk<Box<dyn Array>>, Element> for Collection
where
Element: ArrowSerialize + ArrowField<Type = Element> + 'static,
Collection: IntoIterator<Item = &'a Element>,
{
fn try_into_arrow(self) -> arrow2::error::Result<Chunk<Box<dyn Array>>> {
Ok(Chunk::new(vec![arrow_serialize_to_mutable_array::<
Element,
Element,
Collection,
>(self)?
.as_box()]))
}
fn try_into_arrow_as_type<E>(self) -> arrow2::error::Result<Chunk<Box<dyn Array>>>
where
E: ArrowSerialize + ArrowField<Type = Element> + 'static,
{
Ok(Chunk::new(vec![arrow_serialize_to_mutable_array::<
Element,
E,
Collection,
>(self)?
.as_box()]))
}
}