use datafusion_common::Result;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::ParquetMetaData;
use std::collections::HashMap;
pub fn reverse_row_selection(
row_selection: &RowSelection,
parquet_metadata: &ParquetMetaData,
row_groups_to_scan: &[usize],
) -> Result<RowSelection> {
let rg_metadata = parquet_metadata.row_groups();
let mut rg_row_ranges: Vec<(usize, usize, usize)> =
Vec::with_capacity(row_groups_to_scan.len());
let mut current_row = 0;
for &rg_idx in row_groups_to_scan {
let rg = &rg_metadata[rg_idx];
let num_rows = rg.num_rows() as usize;
rg_row_ranges.push((rg_idx, current_row, current_row + num_rows));
current_row += num_rows; }
let mut rg_selections: HashMap<usize, Vec<RowSelector>> = HashMap::new();
let mut current_file_row = 0;
for selector in row_selection.iter() {
let selector_end = current_file_row + selector.row_count;
for (rg_idx, rg_start, rg_end) in rg_row_ranges.iter() {
if current_file_row < *rg_end && selector_end > *rg_start {
let overlap_start = current_file_row.max(*rg_start);
let overlap_end = selector_end.min(*rg_end);
let overlap_count = overlap_end - overlap_start;
if overlap_count > 0 {
let entry = rg_selections.entry(*rg_idx).or_default();
if selector.skip {
entry.push(RowSelector::skip(overlap_count));
} else {
entry.push(RowSelector::select(overlap_count));
}
}
}
}
current_file_row = selector_end;
}
let mut reversed_selectors = Vec::new();
for &rg_idx in row_groups_to_scan.iter().rev() {
if let Some(selectors) = rg_selections.get(&rg_idx) {
reversed_selectors.extend(selectors.iter().cloned());
} else {
if let Some((_, start, end)) =
rg_row_ranges.iter().find(|(idx, _, _)| *idx == rg_idx)
{
reversed_selectors.push(RowSelector::select(end - start));
}
}
}
Ok(RowSelection::from(reversed_selectors))
}
#[cfg(test)]
mod tests {
use crate::ParquetAccessPlan;
use crate::RowGroupAccess;
use crate::opener::PreparedAccessPlan;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::Bytes;
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use std::sync::Arc;
fn create_test_metadata(
row_group_sizes: Vec<i64>,
) -> parquet::file::metadata::ParquetMetaData {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let mut buffer = Vec::new();
{
let props = parquet::file::properties::WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
for &size in &row_group_sizes {
let array = arrow::array::Int32Array::from(vec![1; size as usize]);
let batch = arrow::record_batch::RecordBatch::try_new(
schema.clone(),
vec![Arc::new(array)],
)
.unwrap();
writer.write(&batch).unwrap();
writer.flush().unwrap();
}
writer.close().unwrap();
}
let bytes = Bytes::from(buffer);
let reader = SerializedFileReader::new(bytes).unwrap();
reader.metadata().clone()
}
#[test]
fn test_prepared_access_plan_reverse_simple() {
let metadata = create_test_metadata(vec![100, 100, 100]);
let access_plan = ParquetAccessPlan::new_all(3);
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]);
assert_eq!(prepared_plan.row_selection, None);
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]);
assert_eq!(reversed_plan.row_selection, None);
}
#[test]
fn test_prepared_access_plan_reverse_with_selection() {
let metadata = create_test_metadata(vec![100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new_all(3);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]),
);
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(
original_selected, reversed_selected,
"Total selected rows should remain the same"
);
}
#[test]
fn test_prepared_access_plan_reverse_multi_row_group_selection() {
let metadata = create_test_metadata(vec![100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new_all(3);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::skip(50), RowSelector::select(50)]),
);
access_plan.scan_selection(
1,
RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]),
);
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
}
#[test]
fn test_prepared_access_plan_reverse_empty_selection() {
let metadata = create_test_metadata(vec![100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new_all(3);
for i in 0..3 {
access_plan
.scan_selection(i, RowSelection::from(vec![RowSelector::skip(100)]));
}
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
let total_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(total_selected, 0);
}
#[test]
fn test_prepared_access_plan_reverse_different_row_group_sizes() {
let metadata = create_test_metadata(vec![50, 150, 100]);
let mut access_plan = ParquetAccessPlan::new_all(3);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::skip(25), RowSelector::select(25)]),
);
access_plan.scan_selection(1, RowSelection::from(vec![RowSelector::select(150)]));
access_plan.scan_selection(
2,
RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]),
);
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
}
#[test]
fn test_prepared_access_plan_reverse_single_row_group() {
let metadata = create_test_metadata(vec![100]);
let mut access_plan = ParquetAccessPlan::new_all(1);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]),
);
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
assert_eq!(reversed_plan.row_group_indexes, vec![0]);
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
assert_eq!(original_selected, 50);
}
#[test]
fn test_prepared_access_plan_reverse_complex_pattern() {
let metadata = create_test_metadata(vec![100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new_all(3);
access_plan.scan_selection(
0,
RowSelection::from(vec![
RowSelector::select(30),
RowSelector::skip(40),
RowSelector::select(30),
]),
);
access_plan.scan_selection(
1,
RowSelection::from(vec![RowSelector::skip(50), RowSelector::select(50)]),
);
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
assert_eq!(original_selected, 210); }
#[test]
fn test_prepared_access_plan_reverse_with_skipped_row_groups() {
let metadata = create_test_metadata(vec![100, 100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan, RowGroupAccess::Skip, RowGroupAccess::Scan, RowGroupAccess::Scan, ]);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(100)]), );
access_plan.scan_selection(
2,
RowSelection::from(vec![
RowSelector::select(25), RowSelector::skip(75), ]),
);
access_plan.scan_selection(
3,
RowSelection::from(vec![RowSelector::select(100)]), );
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, 225);
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
assert_eq!(
reversed_plan.row_group_indexes,
vec![3, 2, 0],
"Row groups should be reversed"
);
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(
reversed_selected, 225,
"Total selected rows should remain the same"
);
let selectors: Vec<_> = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.collect();
assert_eq!(selectors.len(), 3);
assert!(!selectors[0].skip);
assert_eq!(selectors[0].row_count, 125);
assert!(selectors[1].skip);
assert_eq!(selectors[1].row_count, 75);
assert!(!selectors[2].skip);
assert_eq!(selectors[2].row_count, 100);
}
#[test]
fn test_prepared_access_plan_reverse_alternating_row_groups() {
let metadata = create_test_metadata(vec![100, 100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan, RowGroupAccess::Skip, RowGroupAccess::Scan, RowGroupAccess::Skip, ]);
access_plan.scan_selection(0, RowSelection::from(vec![RowSelector::select(100)]));
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]);
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
assert_eq!(original_selected, 200);
}
#[test]
fn test_prepared_access_plan_reverse_middle_row_group_only() {
let metadata = create_test_metadata(vec![100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Skip, RowGroupAccess::Scan, RowGroupAccess::Skip, ]);
access_plan.scan_selection(
1,
RowSelection::from(vec![RowSelector::select(100)]), );
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let original_selected: usize = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(prepared_plan.row_group_indexes, vec![1]);
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
assert_eq!(reversed_plan.row_group_indexes, vec![1]);
let reversed_selected: usize = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
assert_eq!(original_selected, 100);
}
#[test]
fn test_prepared_access_plan_reverse_with_skipped_row_groups_detailed() {
let metadata = create_test_metadata(vec![100, 100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan, RowGroupAccess::Skip, RowGroupAccess::Scan, RowGroupAccess::Scan, ]);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(100)]), );
access_plan.scan_selection(
2,
RowSelection::from(vec![
RowSelector::select(25), RowSelector::skip(75), ]),
);
access_plan.scan_selection(
3,
RowSelection::from(vec![RowSelector::select(100)]), );
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
let orig_selectors: Vec<_> = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.collect();
assert_eq!(
orig_selectors.len(),
3,
"Original should have 3 selectors after merging"
);
assert!(
!orig_selectors[0].skip && orig_selectors[0].row_count == 125,
"Original: First selector should be select(125) from RG0(100) + RG2(25)"
);
assert!(
orig_selectors[1].skip && orig_selectors[1].row_count == 75,
"Original: Second selector should be skip(75) from RG2"
);
assert!(
!orig_selectors[2].skip && orig_selectors[2].row_count == 100,
"Original: Third selector should be select(100) from RG3"
);
let original_selected: usize = orig_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, 225);
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
assert_eq!(
reversed_plan.row_group_indexes,
vec![3, 2, 0],
"Row groups should be reversed"
);
let rev_selectors: Vec<_> = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.collect();
assert_eq!(
rev_selectors.len(),
3,
"Reversed should have 3 selectors after merging"
);
assert!(
!rev_selectors[0].skip && rev_selectors[0].row_count == 125,
"Reversed: First selector should be select(125) from RG3(100) + RG2(25), got skip={} count={}",
rev_selectors[0].skip,
rev_selectors[0].row_count
);
assert!(
rev_selectors[1].skip && rev_selectors[1].row_count == 75,
"Reversed: Second selector should be skip(75) from RG2, got skip={} count={}",
rev_selectors[1].skip,
rev_selectors[1].row_count
);
assert!(
!rev_selectors[2].skip && rev_selectors[2].row_count == 100,
"Reversed: Third selector should be select(100) from RG0, got skip={} count={}",
rev_selectors[2].skip,
rev_selectors[2].row_count
);
let reversed_selected: usize = rev_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(
reversed_selected, 225,
"Total selected rows should remain the same"
);
}
#[test]
fn test_prepared_access_plan_reverse_complex_pattern_detailed() {
let metadata = create_test_metadata(vec![100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new_all(3);
access_plan.scan_selection(
0,
RowSelection::from(vec![
RowSelector::select(30),
RowSelector::skip(40),
RowSelector::select(30),
]),
);
access_plan.scan_selection(
1,
RowSelection::from(vec![RowSelector::skip(50), RowSelector::select(50)]),
);
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let orig_selectors: Vec<_> = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.collect();
let original_selected: usize = orig_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, 210);
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
let rev_selectors: Vec<_> = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.collect();
let reversed_selected: usize = rev_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(
reversed_selected, 210,
"Total selected rows should remain the same (30 + 30 + 50 + 100)"
);
assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]);
}
#[test]
fn test_prepared_access_plan_reverse_alternating_detailed() {
let metadata = create_test_metadata(vec![100, 100, 100, 100]);
let mut access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan, RowGroupAccess::Skip, RowGroupAccess::Scan, RowGroupAccess::Skip, ]);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(30), RowSelector::skip(70)]),
);
access_plan.scan_selection(
2,
RowSelection::from(vec![RowSelector::skip(20), RowSelector::select(80)]),
);
let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);
let orig_selectors: Vec<_> = prepared_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.collect();
let original_selected: usize = orig_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, 110);
let reversed_plan = prepared_plan
.reverse(&metadata)
.expect("Failed to reverse PreparedAccessPlan");
assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]);
let rev_selectors: Vec<_> = reversed_plan
.row_selection
.as_ref()
.unwrap()
.iter()
.collect();
let reversed_selected: usize = rev_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(reversed_selected, 110);
assert_eq!(rev_selectors.len(), 3, "Reversed should have 3 selectors");
assert!(
rev_selectors[0].skip && rev_selectors[0].row_count == 20,
"First selector should be skip(20) from RG2"
);
assert!(
!rev_selectors[1].skip && rev_selectors[1].row_count == 110,
"Second selector should be select(110) from RG2(80) + RG0(30)"
);
assert!(
rev_selectors[2].skip && rev_selectors[2].row_count == 70,
"Third selector should be skip(70) from RG0"
);
}
}