use crate::arrow::array_reader::{ArrayReader, ListArrayReader};
use crate::errors::Result;
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, GenericListViewArray, OffsetSizeTrait, new_empty_array};
use arrow_schema::DataType as ArrowType;
use std::any::Any;
use std::sync::Arc;
pub struct ListViewArrayReader<OffsetSize: OffsetSizeTrait> {
inner: ListArrayReader<OffsetSize>,
data_type: ArrowType,
}
impl<OffsetSize: OffsetSizeTrait> ListViewArrayReader<OffsetSize> {
pub fn new(
item_reader: Box<dyn ArrayReader>,
data_type: ArrowType,
def_level: i16,
rep_level: i16,
nullable: bool,
) -> Self {
let list_data_type = match &data_type {
ArrowType::ListView(f) => ArrowType::List(f.clone()),
ArrowType::LargeListView(f) => ArrowType::LargeList(f.clone()),
_ => unreachable!(),
};
let inner =
ListArrayReader::new(item_reader, list_data_type, def_level, rep_level, nullable);
Self { inner, data_type }
}
}
impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListViewArrayReader<OffsetSize> {
fn as_any(&self) -> &dyn Any {
self
}
fn get_data_type(&self) -> &ArrowType {
&self.data_type
}
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
self.inner.read_records(batch_size)
}
fn consume_batch(&mut self) -> Result<ArrayRef> {
let array = self.inner.consume_batch()?;
if array.is_empty() {
return Ok(new_empty_array(&self.data_type));
}
let list_array = array.as_list::<OffsetSize>();
let list_view_array =
Arc::new(GenericListViewArray::<OffsetSize>::from(list_array.clone()));
assert_eq!(
list_view_array.data_type(),
&self.data_type,
"Converted array type does not match expected type"
);
Ok(list_view_array)
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.inner.skip_records(num_records)
}
fn get_def_levels(&self) -> Option<&[i16]> {
self.inner.get_def_levels()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
self.inner.get_rep_levels()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
use arrow::datatypes::Int32Type as ArrowInt32;
use arrow_array::PrimitiveArray;
fn test_nullable_list_view<OffsetSize: OffsetSizeTrait>() {
let expected =
GenericListViewArray::<OffsetSize>::from_iter_primitive::<ArrowInt32, _, _>(vec![
Some(vec![Some(1), None, Some(2)]),
None,
Some(vec![]),
Some(vec![Some(3), Some(4)]),
Some(vec![]),
Some(vec![]),
None,
Some(vec![]),
Some(vec![None, Some(1)]),
]);
let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
Some(1),
None,
Some(2),
None,
None,
Some(3),
Some(4),
None,
None,
None,
None,
None,
Some(1),
]));
let item_array_reader = InMemoryArrayReader::new(
ArrowType::Int32,
array,
Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]),
Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]),
);
let field = Arc::new(arrow_schema::Field::new_list_field(ArrowType::Int32, true));
let data_type = if OffsetSize::IS_LARGE {
ArrowType::LargeListView(field)
} else {
ArrowType::ListView(field)
};
let mut list_view_array_reader = ListViewArrayReader::<OffsetSize>::new(
Box::new(item_array_reader),
data_type,
2,
1,
true,
);
let actual = list_view_array_reader.next_batch(1024).unwrap();
let actual = actual
.as_any()
.downcast_ref::<GenericListViewArray<OffsetSize>>()
.unwrap();
assert_eq!(&expected, actual)
}
fn test_required_list_view<OffsetSize: OffsetSizeTrait>() {
let expected =
GenericListViewArray::<OffsetSize>::from_iter_primitive::<ArrowInt32, _, _>(vec![
Some(vec![Some(1), None, Some(2)]),
Some(vec![]),
Some(vec![Some(3), Some(4)]),
Some(vec![]),
Some(vec![]),
Some(vec![None, Some(1)]),
]);
let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
Some(1),
None,
Some(2),
None,
Some(3),
Some(4),
None,
None,
None,
Some(1),
]));
let item_array_reader = InMemoryArrayReader::new(
ArrowType::Int32,
array,
Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]),
Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]),
);
let field = Arc::new(arrow_schema::Field::new_list_field(ArrowType::Int32, true));
let data_type = if OffsetSize::IS_LARGE {
ArrowType::LargeListView(field)
} else {
ArrowType::ListView(field)
};
let mut list_view_array_reader = ListViewArrayReader::<OffsetSize>::new(
Box::new(item_array_reader),
data_type,
1,
1,
false,
);
let actual = list_view_array_reader.next_batch(1024).unwrap();
let actual = actual
.as_any()
.downcast_ref::<GenericListViewArray<OffsetSize>>()
.unwrap();
assert_eq!(&expected, actual)
}
fn test_list_view_array<OffsetSize: OffsetSizeTrait>() {
test_nullable_list_view::<OffsetSize>();
test_required_list_view::<OffsetSize>();
}
#[test]
fn test_list_view_array_reader() {
test_list_view_array::<i32>();
}
#[test]
fn test_large_list_view_array_reader() {
test_list_view_array::<i64>()
}
}