use crate::errors::Result;
use arrow_array::ArrayRef;
use arrow_schema::DataType as ArrowType;
use std::any::Any;
use std::sync::Arc;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::{FilePageIterator, FileReader};
mod builder;
mod byte_array;
mod byte_array_dictionary;
mod byte_view_array;
mod cached_array_reader;
mod empty_array;
mod fixed_len_byte_array;
mod fixed_size_list_array;
mod list_array;
mod list_view_array;
mod map_array;
mod null_array;
mod primitive_array;
mod row_group_cache;
mod row_group_index;
mod row_number;
mod struct_array;
#[cfg(test)]
mod test_util;
use crate::file::metadata::RowGroupMetaData;
pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
#[allow(unused_imports)] pub use byte_view_array::make_byte_view_array_reader;
#[allow(unused_imports)] pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
pub use fixed_size_list_array::FixedSizeListArrayReader;
pub use list_array::ListArrayReader;
pub use list_view_array::ListViewArrayReader;
pub use map_array::MapArrayReader;
pub use null_array::NullArrayReader;
pub use primitive_array::PrimitiveArrayReader;
pub use row_group_cache::RowGroupCache;
pub use struct_array::StructArrayReader;
pub trait ArrayReader: Send {
#[allow(dead_code)]
fn as_any(&self) -> &dyn Any;
fn get_data_type(&self) -> &ArrowType;
#[cfg(any(feature = "experimental", test))]
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
self.read_records(batch_size)?;
self.consume_batch()
}
fn read_records(&mut self, batch_size: usize) -> Result<usize>;
fn consume_batch(&mut self) -> Result<ArrayRef>;
fn skip_records(&mut self, num_records: usize) -> Result<usize>;
fn get_def_levels(&self) -> Option<&[i16]>;
fn get_rep_levels(&self) -> Option<&[i16]>;
}
pub trait RowGroups {
fn num_rows(&self) -> usize;
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;
fn metadata(&self) -> &ParquetMetaData;
}
impl RowGroups for Arc<dyn FileReader> {
fn num_rows(&self) -> usize {
FileReader::metadata(self.as_ref())
.file_metadata()
.num_rows() as usize
}
fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(FileReader::metadata(self.as_ref()).row_groups().iter())
}
fn metadata(&self) -> &ParquetMetaData {
FileReader::metadata(self.as_ref())
}
}
fn read_records<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
batch_size: usize,
) -> Result<usize>
where
V: ValuesBuffer,
CV: ColumnValueDecoder<Buffer = V>,
{
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;
let records_read_once = record_reader.read_records(records_to_read)?;
records_read += records_read_once;
if records_read_once < records_to_read {
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
} else {
break;
}
}
}
Ok(records_read)
}
fn skip_records<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
batch_size: usize,
) -> Result<usize>
where
V: ValuesBuffer,
CV: ColumnValueDecoder<Buffer = V>,
{
let mut records_skipped = 0usize;
while records_skipped < batch_size {
let records_to_read = batch_size - records_skipped;
let records_skipped_once = record_reader.skip_records(records_to_read)?;
records_skipped += records_skipped_once;
if records_skipped_once < records_to_read {
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
} else {
break;
}
}
}
Ok(records_skipped)
}