use clap::Parser;
use parquet::data_type::ByteArray;
use parquet::errors::{ParquetError, Result};
use parquet::file::page_index::column_index::{
ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
};
use parquet::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::ReadOptionsBuilder;
use std::fs::File;
#[derive(Debug, Parser)]
#[clap(author, version, about("Prints the page index of a parquet file"), long_about = None)]
struct Args {
#[clap(help("Path to a parquet file"))]
file: String,
#[clap(help("Column name to print"))]
column: String,
}
impl Args {
fn run(&self) -> Result<()> {
let file = File::open(&self.file)?;
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(file, options)?;
let schema = reader.metadata().file_metadata().schema_descr();
let column_idx = schema
.columns()
.iter()
.position(|x| x.name() == self.column.as_str())
.ok_or_else(|| {
ParquetError::General(format!("Failed to find column {}", self.column))
})?;
let column_index = reader
.metadata()
.column_index()
.ok_or_else(|| ParquetError::General("Column index not found".to_string()))?;
let offset_index = reader
.metadata()
.offset_index()
.ok_or_else(|| ParquetError::General("Offset index not found".to_string()))?;
for (row_group_idx, ((column_indices, offset_indices), row_group)) in column_index
.iter()
.zip(offset_index)
.zip(reader.metadata().row_groups())
.enumerate()
{
println!("Row Group: {row_group_idx}");
let offset_index = offset_indices.get(column_idx).ok_or_else(|| {
ParquetError::General(format!(
"No offset index for row group {row_group_idx} column chunk {column_idx}"
))
})?;
let row_counts =
compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows());
match &column_indices[column_idx] {
ColumnIndexMetaData::NONE => println!("NO INDEX"),
ColumnIndexMetaData::BOOLEAN(v) => {
print_index::<bool>(v, offset_index, &row_counts)?
}
ColumnIndexMetaData::INT32(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::INT64(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::INT96(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::FLOAT(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::DOUBLE(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::BYTE_ARRAY(v) => {
print_bytes_index(v, offset_index, &row_counts)?
}
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
print_bytes_index(v, offset_index, &row_counts)?
}
}
}
Ok(())
}
}
fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
if offset_index.is_empty() {
return vec![];
}
let mut last = offset_index[0].first_row_index;
let mut out = Vec::with_capacity(offset_index.len());
for o in offset_index.iter().skip(1) {
out.push(o.first_row_index - last);
last = o.first_row_index;
}
out.push(rows - last);
out
}
fn print_index<T: std::fmt::Display>(
column_index: &PrimitiveColumnIndex<T>,
offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
if column_index.num_pages() as usize != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.num_pages(),
offset_index.page_locations.len()
)));
}
for (idx, (((min, max), o), row_count)) in column_index
.min_values_iter()
.zip(column_index.max_values_iter())
.zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
{
print!(
"Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
idx, o.offset, o.compressed_page_size, row_count
);
match min {
Some(m) => print!(", min {m:>10}"),
None => print!(", min {:>10}", "NONE"),
}
match max {
Some(m) => print!(", max {m:>10}"),
None => print!(", max {:>10}", "NONE"),
}
println!()
}
Ok(())
}
fn print_bytes_index(
column_index: &ByteArrayColumnIndex,
offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
if column_index.num_pages() as usize != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.num_pages(),
offset_index.page_locations.len()
)));
}
for (idx, (((min, max), o), row_count)) in column_index
.min_values_iter()
.zip(column_index.max_values_iter())
.zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
{
print!(
"Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
idx, o.offset, o.compressed_page_size, row_count
);
match min {
Some(m) => match String::from_utf8(m.to_vec()) {
Ok(s) => print!(", min {s:>10}"),
Err(_) => print!(", min {:>10}", ByteArray::from(m)),
},
None => print!(", min {:>10}", "NONE"),
}
match max {
Some(m) => match String::from_utf8(m.to_vec()) {
Ok(s) => print!(", max {s:>10}"),
Err(_) => print!(", min {:>10}", ByteArray::from(m)),
},
None => print!(", max {:>10}", "NONE"),
}
println!()
}
Ok(())
}
fn main() -> Result<()> {
Args::parse().run()
}