use datafusion_common::{internal_err, Result};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetAccessPlan {
row_groups: Vec<RowGroupAccess>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RowGroupAccess {
Skip,
Scan,
Selection(RowSelection),
}
impl RowGroupAccess {
pub fn should_scan(&self) -> bool {
match self {
RowGroupAccess::Skip => false,
RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
}
}
}
impl ParquetAccessPlan {
pub fn new_all(row_group_count: usize) -> Self {
Self {
row_groups: vec![RowGroupAccess::Scan; row_group_count],
}
}
pub fn new_none(row_group_count: usize) -> Self {
Self {
row_groups: vec![RowGroupAccess::Skip; row_group_count],
}
}
pub fn new(row_groups: Vec<RowGroupAccess>) -> Self {
Self { row_groups }
}
pub fn set(&mut self, idx: usize, access: RowGroupAccess) {
self.row_groups[idx] = access;
}
pub fn skip(&mut self, idx: usize) {
self.set(idx, RowGroupAccess::Skip);
}
pub fn scan(&mut self, idx: usize) {
self.set(idx, RowGroupAccess::Scan);
}
pub fn should_scan(&self, idx: usize) -> bool {
self.row_groups[idx].should_scan()
}
pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
self.row_groups[idx] = match &self.row_groups[idx] {
RowGroupAccess::Skip => RowGroupAccess::Skip,
RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
RowGroupAccess::Selection(existing_selection) => {
RowGroupAccess::Selection(existing_selection.intersection(&selection))
}
}
}
pub fn into_overall_row_selection(
self,
row_group_meta_data: &[RowGroupMetaData],
) -> Result<Option<RowSelection>> {
assert_eq!(row_group_meta_data.len(), self.row_groups.len());
if !self
.row_groups
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
{
return Ok(None);
}
for (idx, (rg, rg_meta)) in self
.row_groups
.iter()
.zip(row_group_meta_data.iter())
.enumerate()
{
let RowGroupAccess::Selection(selection) = rg else {
continue;
};
let rows_in_selection = selection
.iter()
.map(|selection| selection.row_count)
.sum::<usize>();
let row_group_row_count = rg_meta.num_rows();
if rows_in_selection as i64 != row_group_row_count {
return internal_err!(
"Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \
but selection only specifies {rows_in_selection} rows. \
Selection: {selection:?}"
);
}
}
let total_selection: RowSelection = self
.row_groups
.into_iter()
.zip(row_group_meta_data.iter())
.flat_map(|(rg, rg_meta)| {
match rg {
RowGroupAccess::Skip => vec![],
RowGroupAccess::Scan => {
vec![RowSelector::select(rg_meta.num_rows() as usize)]
}
RowGroupAccess::Selection(selection) => {
let selection: Vec<RowSelector> = selection.into();
selection
}
}
})
.collect();
Ok(Some(total_selection))
}
pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
if b.should_scan() {
Some(idx)
} else {
None
}
})
}
pub fn row_group_indexes(&self) -> Vec<usize> {
self.row_group_index_iter().collect()
}
pub fn len(&self) -> usize {
self.row_groups.len()
}
pub fn is_empty(&self) -> bool {
self.row_groups.is_empty()
}
pub fn inner(&self) -> &[RowGroupAccess] {
&self.row_groups
}
pub fn into_inner(self) -> Vec<RowGroupAccess> {
self.row_groups
}
}
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::assert_contains;
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, LazyLock};
#[test]
fn test_only_scans() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Scan,
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);
let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_eq!(row_selection, None);
}
#[test]
fn test_only_skips() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Skip,
RowGroupAccess::Skip,
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]);
let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();
assert_eq!(row_group_indexes, vec![] as Vec<usize>);
assert_eq!(row_selection, None);
}
#[test]
fn test_mixed_1() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(8),
]
.into(),
),
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]);
let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();
assert_eq!(row_group_indexes, vec![0, 1]);
assert_eq!(
row_selection,
Some(
vec![
RowSelector::select(10),
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(8)
]
.into()
)
);
}
#[test]
fn test_mixed_2() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Skip,
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
]
.into(),
),
RowGroupAccess::Scan,
]);
let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();
assert_eq!(row_group_indexes, vec![1, 2, 3]);
assert_eq!(
row_selection,
Some(
vec![
RowSelector::select(20),
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
RowSelector::select(40),
]
.into()
)
);
}
#[test]
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);
let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
}
#[test]
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![
RowSelector::select(10),
RowSelector::skip(2),
RowSelector::select(10),
]
.into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);
let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}
static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];
row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
});
fn get_test_schema_descr() -> SchemaDescPtr {
use parquet::basic::Type as PhysicalType;
use parquet::schema::types::Type as SchemaType;
let field = SchemaType::primitive_type_builder("a", PhysicalType::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap();
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![Arc::new(field)])
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
}