use crate::arrow::array_reader::ArrayReader;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
pub(crate) struct RowNumberReader {
buffered_row_numbers: Vec<i64>,
remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
}
impl RowNumberReader {
pub(crate) fn try_new<'a>(
parquet_metadata: &'a ParquetMetaData,
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
) -> Result<Self> {
let mut ordinal_to_offset: HashMap<i16, i64> = HashMap::new();
let mut first_row_index: i64 = 0;
for rg in parquet_metadata.row_groups() {
if let Some(ordinal) = rg.ordinal() {
ordinal_to_offset.insert(ordinal, first_row_index);
}
first_row_index += rg.num_rows();
}
let ranges: Vec<_> = row_groups
.map(|rg| {
let ordinal = rg.ordinal().ok_or_else(|| {
ParquetError::General(
"Row group missing ordinal field, required to compute row numbers"
.to_string(),
)
})?;
let offset = ordinal_to_offset.get(&ordinal).ok_or_else(|| {
ParquetError::General(format!(
"Row group with ordinal {} not found in metadata",
ordinal
))
})?;
Ok(*offset..*offset + rg.num_rows())
})
.collect::<Result<_>>()?;
Ok(Self {
buffered_row_numbers: Vec::new(),
remaining_row_numbers: ranges.into_iter().flatten(),
})
}
}
impl ArrayReader for RowNumberReader {
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let starting_len = self.buffered_row_numbers.len();
self.buffered_row_numbers
.extend((&mut self.remaining_row_numbers).take(batch_size));
Ok(self.buffered_row_numbers.len() - starting_len)
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
Ok((&mut self.remaining_row_numbers).take(num_records).count())
}
fn as_any(&self) -> &dyn Any {
self
}
fn get_data_type(&self) -> &DataType {
&DataType::Int64
}
fn consume_batch(&mut self) -> Result<ArrayRef> {
Ok(Arc::new(Int64Array::from_iter(
self.buffered_row_numbers.drain(..),
)))
}
fn get_def_levels(&self) -> Option<&[i16]> {
None
}
fn get_rep_levels(&self) -> Option<&[i16]> {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::basic::Type as PhysicalType;
use crate::file::metadata::{
ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
};
use crate::schema::types::{SchemaDescriptor, Type as SchemaType};
use std::sync::Arc;
fn create_test_schema() -> Arc<SchemaDescriptor> {
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![Arc::new(
SchemaType::primitive_type_builder("test_col", PhysicalType::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData {
let schema_descr = create_test_schema();
let mut row_group_metas = vec![];
for (ordinal, num_rows) in row_groups {
let columns: Vec<_> = schema_descr
.columns()
.iter()
.map(|col| ColumnChunkMetaData::builder(col.clone()).build().unwrap())
.collect();
let row_group = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_ordinal(ordinal)
.set_total_byte_size(100)
.set_column_metadata(columns)
.build()
.unwrap();
row_group_metas.push(row_group);
}
let total_rows: i64 = row_group_metas.iter().map(|rg| rg.num_rows()).sum();
let file_metadata = FileMetaData::new(
1, total_rows, None, None, schema_descr, None, );
ParquetMetaData::new(file_metadata, row_group_metas)
}
#[test]
fn test_row_number_reader_reverse_order() {
let metadata = create_test_parquet_metadata(vec![
(0, 2), (1, 2), (2, 2), ]);
let selected_row_groups: Vec<_> = vec![
&metadata.row_groups()[2], &metadata.row_groups()[0], ];
let mut reader =
RowNumberReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap();
let num_read = reader.read_records(6).unwrap();
assert_eq!(num_read, 4);
let array = reader.consume_batch().unwrap();
let row_numbers = array.as_any().downcast_ref::<Int64Array>().unwrap();
let expected = vec![4, 5, 0, 1];
let actual: Vec<i64> = row_numbers.iter().map(|v| v.unwrap()).collect();
assert_eq!(
actual, expected,
"Row numbers should match the order of selected row groups, not file order"
);
}
}