use bytes::{Buf, Bytes};
use std::fs::File;
use std::io::{BufReader, Seek, SeekFrom};
use std::{io::Read, sync::Arc};
use crate::bloom_filter::Sbbf;
use crate::column::page::PageIterator;
use crate::column::{page::PageReader, reader::ColumnReader};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::*;
pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader};
use crate::record::reader::RowIter;
use crate::schema::types::Type as SchemaType;
use crate::basic::Type;
use crate::column::reader::ColumnReaderImpl;
#[allow(clippy::len_without_is_empty)]
pub trait Length {
fn len(&self) -> u64;
}
pub trait ChunkReader: Length + Send + Sync {
type T: Read;
fn get_read(&self, start: u64) -> Result<Self::T>;
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
}
impl Length for File {
fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}
}
impl ChunkReader for File {
type T = BufReader<File>;
fn get_read(&self, start: u64) -> Result<Self::T> {
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(start))?;
Ok(BufReader::new(self.try_clone()?))
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(length);
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(start))?;
let read = reader.take(length as _).read_to_end(&mut buffer)?;
if read != length {
return Err(eof_err!(
"Expected to read {} bytes, read only {}",
length,
read
));
}
Ok(buffer.into())
}
}
impl Length for Bytes {
fn len(&self) -> u64 {
self.len() as u64
}
}
impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;
fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
if start > self.len() {
return Err(eof_err!(
"Expected to read at offset {start}, while file has length {}",
self.len()
));
}
Ok(self.slice(start..).reader())
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
if start > self.len() || start + length > self.len() {
return Err(eof_err!(
"Expected to read {} bytes at offset {}, while file has length {}",
length,
start,
self.len()
));
}
Ok(self.slice(start..start + length))
}
}
pub trait FileReader: Send + Sync {
fn metadata(&self) -> &ParquetMetaData;
fn num_row_groups(&self) -> usize;
fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>>;
}
pub trait RowGroupReader: Send + Sync {
fn metadata(&self) -> &RowGroupMetaData;
fn num_columns(&self) -> usize;
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
fn get_column_reader(&self, i: usize) -> Result<ColumnReader> {
let schema_descr = self.metadata().schema_descr();
let col_descr = schema_descr.column(i);
let col_page_reader = self.get_column_page_reader(i)?;
let col_reader = match col_descr.physical_type() {
Type::BOOLEAN => {
ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
}
Type::INT32 => {
ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
}
Type::INT64 => {
ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
}
Type::INT96 => {
ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
}
Type::FLOAT => {
ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
}
Type::DOUBLE => {
ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
}
Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader),
),
};
Ok(col_reader)
}
fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>>;
}
pub struct FilePageIterator {
column_index: usize,
row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
file_reader: Arc<dyn FileReader>,
}
impl FilePageIterator {
pub fn new(column_index: usize, file_reader: Arc<dyn FileReader>) -> Result<Self> {
let num_row_groups = file_reader.metadata().num_row_groups();
let row_group_indices = Box::new(0..num_row_groups);
Self::with_row_groups(column_index, row_group_indices, file_reader)
}
pub fn with_row_groups(
column_index: usize,
row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
file_reader: Arc<dyn FileReader>,
) -> Result<Self> {
let num_columns = file_reader
.metadata()
.file_metadata()
.schema_descr()
.num_columns();
if column_index >= num_columns {
return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
}
Ok(Self {
column_index,
row_group_indices,
file_reader,
})
}
}
impl Iterator for FilePageIterator {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Result<Box<dyn PageReader>>> {
self.row_group_indices.next().map(|row_group_index| {
self.file_reader
.get_row_group(row_group_index)
.and_then(|r| r.get_column_page_reader(self.column_index))
})
}
}
impl PageIterator for FilePageIterator {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bytes_chunk_reader_get_read_out_of_bounds() {
let data = Bytes::from(vec![0, 1, 2, 3]);
let err = data.get_read(5).unwrap_err();
assert_eq!(
err.to_string(),
"EOF: Expected to read at offset 5, while file has length 4"
);
}
#[test]
fn test_bytes_chunk_reader_get_bytes_out_of_bounds() {
let data = Bytes::from(vec![0, 1, 2, 3]);
let err = data.get_bytes(5, 1).unwrap_err();
assert_eq!(
err.to_string(),
"EOF: Expected to read 1 bytes at offset 5, while file has length 4"
);
let err = data.get_bytes(2, 3).unwrap_err();
assert_eq!(
err.to_string(),
"EOF: Expected to read 3 bytes at offset 2, while file has length 4"
);
}
}