use bytes::Bytes;
use std::{boxed::Box, io::Read, sync::Arc};
use crate::bloom_filter::Sbbf;
use crate::column::page::PageIterator;
use crate::column::{page::PageReader, reader::ColumnReader};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::*;
pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader};
use crate::record::reader::RowIter;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, Type as SchemaType};
use crate::basic::Type;
use crate::column::reader::ColumnReaderImpl;
#[allow(clippy::len_without_is_empty)]
pub trait Length {
fn len(&self) -> u64;
}
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(length);
let read = self.get_read(start, length)?.read_to_end(&mut buffer)?;
if read != length {
return Err(eof_err!(
"Expected to read {} bytes, read only {}",
length,
read
));
}
Ok(buffer.into())
}
}
pub trait FileReader: Send + Sync {
fn metadata(&self) -> &ParquetMetaData;
fn num_row_groups(&self) -> usize;
fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
}
pub trait RowGroupReader: Send + Sync {
fn metadata(&self) -> &RowGroupMetaData;
fn num_columns(&self) -> usize;
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
fn get_column_reader(&self, i: usize) -> Result<ColumnReader> {
let schema_descr = self.metadata().schema_descr();
let col_descr = schema_descr.column(i);
let col_page_reader = self.get_column_page_reader(i)?;
let col_reader = match col_descr.physical_type() {
Type::BOOLEAN => ColumnReader::BoolColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::INT32 => ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::INT64 => ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::INT96 => ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::FLOAT => ColumnReader::FloatColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::DOUBLE => ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader),
),
Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader),
),
};
Ok(col_reader)
}
fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
}
pub struct FilePageIterator {
column_index: usize,
row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
file_reader: Arc<dyn FileReader>,
}
impl FilePageIterator {
pub fn new(column_index: usize, file_reader: Arc<dyn FileReader>) -> Result<Self> {
let num_row_groups = file_reader.metadata().num_row_groups();
let row_group_indices = Box::new(0..num_row_groups);
Self::with_row_groups(column_index, row_group_indices, file_reader)
}
pub fn with_row_groups(
column_index: usize,
row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
file_reader: Arc<dyn FileReader>,
) -> Result<Self> {
let num_columns = file_reader
.metadata()
.file_metadata()
.schema_descr()
.num_columns();
if column_index >= num_columns {
return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
}
Ok(Self {
column_index,
row_group_indices,
file_reader,
})
}
}
impl Iterator for FilePageIterator {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Result<Box<dyn PageReader>>> {
self.row_group_indices.next().map(|row_group_index| {
self.file_reader
.get_row_group(row_group_index)
.and_then(|r| r.get_column_page_reader(self.column_index))
})
}
}
impl PageIterator for FilePageIterator {
fn schema(&mut self) -> Result<SchemaDescPtr> {
Ok(self
.file_reader
.metadata()
.file_metadata()
.schema_descr_ptr())
}
fn column_schema(&mut self) -> Result<ColumnDescPtr> {
self.schema().map(|s| s.column(self.column_index))
}
}