use crate::TblError;
use futures::stream::{self, StreamExt};
use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
use polars::prelude::*;
use std::collections::HashMap;
pub async fn get_parquet_row_count(path: &std::path::Path) -> Result<u64, TblError> {
let file = tokio::fs::File::open(path).await?;
let builder = ParquetRecordBatchStreamBuilder::new(file)
.await?
.with_batch_size(1);
let file_metadata = builder.metadata().file_metadata();
Ok(file_metadata.num_rows() as u64)
}
pub async fn get_parquet_row_counts(paths: &[&std::path::Path]) -> Result<Vec<u64>, TblError> {
let row_counts = stream::iter(paths)
.map(|path| get_parquet_row_count(path))
.buffered(10)
.collect::<Vec<Result<u64, TblError>>>()
.await;
row_counts
.into_iter()
.collect::<Result<Vec<u64>, TblError>>()
}
pub async fn get_parquet_schema(path: &std::path::Path) -> Result<Arc<Schema>, TblError> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let scan_args = ScanArgsParquet::default();
let mut lf = LazyFrame::scan_parquet(path, scan_args)?;
let schema = lf.schema()?;
Ok(schema)
})
.await?
}
pub async fn get_parquet_schemas(
paths: &[std::path::PathBuf],
) -> Result<Vec<Arc<Schema>>, TblError> {
let schemas = stream::iter(paths)
.map(|path| get_parquet_schema(path))
.buffered(10)
.collect::<Vec<Result<Arc<Schema>, TblError>>>()
.await;
schemas
.into_iter()
.collect::<Result<Vec<Arc<Schema>>, TblError>>()
}
#[derive(Clone, Default)]
pub struct TabularSummary {
pub n_files: u64,
pub n_bytes_compressed: u64,
pub n_bytes_uncompressed: u64,
pub n_rows: u64,
pub schema: Arc<Schema>,
pub columns: Vec<TabularColumnSummary>,
}
#[derive(Default, Clone, Debug)]
pub struct TabularColumnSummary {
pub n_bytes_compressed: u64,
pub n_bytes_uncompressed: u64,
}
pub async fn get_parquet_summary(path: &std::path::Path) -> Result<TabularSummary, TblError> {
let metadata = std::fs::metadata(path)?;
let n_bytes_compressed = metadata.len();
let n_rows = get_parquet_row_count(path).await?;
let schema = get_parquet_schema(path).await?;
let parquet_metadata = get_parquet_metadata(path).await?;
let columns = get_parquet_column_summaries(parquet_metadata.clone()).await?;
let n_bytes_uncompressed = get_parquet_n_bytes_uncompressed(parquet_metadata);
Ok(TabularSummary {
n_files: 1,
n_bytes_compressed,
n_bytes_uncompressed,
n_rows,
schema,
columns,
})
}
pub async fn get_parquet_metadata(
path: &std::path::Path,
) -> Result<std::sync::Arc<parquet::file::metadata::ParquetMetaData>, TblError> {
let file = tokio::fs::File::open(path).await?;
let builder = ParquetRecordBatchStreamBuilder::new(file)
.await?
.with_batch_size(1);
Ok(builder.metadata().clone())
}
pub fn get_parquet_n_bytes_uncompressed(
metadata: Arc<parquet::file::metadata::ParquetMetaData>,
) -> u64 {
metadata
.row_groups()
.iter()
.map(|rg| rg.total_byte_size() as u64)
.sum::<u64>()
}
pub async fn get_parquet_column_summaries(
metadata: Arc<parquet::file::metadata::ParquetMetaData>,
) -> Result<Vec<TabularColumnSummary>, TblError> {
let n_columns = metadata
.row_groups()
.first()
.map(|rg| rg.columns().len())
.unwrap_or(0);
let mut columns: Vec<TabularColumnSummary> = vec![TabularColumnSummary::default(); n_columns];
for rg in metadata.row_groups() {
for (column, column_metadata) in columns.iter_mut().zip(rg.columns()) {
column.n_bytes_compressed += column_metadata.compressed_size() as u64;
column.n_bytes_uncompressed += column_metadata.uncompressed_size() as u64;
}
}
Ok(columns)
}
pub async fn get_parquet_summaries(
paths: &[std::path::PathBuf],
) -> Result<Vec<TabularSummary>, TblError> {
let schemas = stream::iter(paths)
.map(|path| get_parquet_summary(path))
.buffered(10)
.collect::<Vec<Result<TabularSummary, TblError>>>()
.await;
schemas
.into_iter()
.collect::<Result<Vec<TabularSummary>, TblError>>()
}
pub fn combine_tabular_summaries(
summaries: &[&TabularSummary],
include_columns: bool,
) -> Result<TabularSummary, TblError> {
let mut total_summary = TabularSummary::default();
for (s, summary) in summaries.iter().enumerate() {
if s == 0 {
total_summary.schema = summary.schema.clone();
}
total_summary.n_files += summary.n_files;
total_summary.n_bytes_compressed += summary.n_bytes_compressed;
total_summary.n_bytes_uncompressed += summary.n_bytes_uncompressed;
total_summary.n_rows += summary.n_rows;
if include_columns {
total_summary.columns = combine_tabular_columns_summaries(
total_summary.columns.as_slice(),
summary.columns.as_slice(),
)?;
}
}
Ok(total_summary)
}
fn combine_tabular_columns_summaries(
lhs: &[TabularColumnSummary],
rhs: &[TabularColumnSummary],
) -> Result<Vec<TabularColumnSummary>, TblError> {
if lhs.is_empty() {
Ok(rhs.to_vec())
} else if rhs.is_empty() {
Ok(lhs.to_vec())
} else if lhs.len() != rhs.len() {
Err(TblError::SchemaError(
"different number of columns".to_string(),
))
} else {
Ok(lhs
.iter()
.zip(rhs.iter())
.map(|(lhs, rhs)| combine_tabular_column_summary(lhs, rhs))
.collect())
}
}
fn combine_tabular_column_summary(
lhs: &TabularColumnSummary,
rhs: &TabularColumnSummary,
) -> TabularColumnSummary {
TabularColumnSummary {
n_bytes_compressed: lhs.n_bytes_compressed + rhs.n_bytes_compressed,
n_bytes_uncompressed: lhs.n_bytes_uncompressed + rhs.n_bytes_uncompressed,
}
}
pub fn summarize_by_schema(
summaries: &[&TabularSummary],
) -> Result<HashMap<Arc<Schema>, TabularSummary>, TblError> {
let mut by_schema: HashMap<Arc<Schema>, Vec<&TabularSummary>> = HashMap::new();
for summary in summaries.iter() {
by_schema
.entry(summary.schema.clone())
.or_default()
.push(summary)
}
by_schema
.into_iter()
.map(|(k, v)| combine_tabular_summaries(v.as_slice(), true).map(|combined| (k, combined)))
.collect()
}