use crate::error::{IoError, Result};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
#[derive(Debug, Clone)]
pub struct ColumnStatistics {
pub name: String,
pub null_count: Option<i64>,
pub distinct_count: Option<i64>,
pub min_value: Option<Vec<u8>>,
pub max_value: Option<Vec<u8>>,
pub num_values: i64,
}
impl ColumnStatistics {
pub fn min_f64(&self) -> Option<f64> {
self.min_value.as_ref().and_then(|bytes| {
if bytes.len() == 8 {
let mut array = [0u8; 8];
array.copy_from_slice(bytes);
Some(f64::from_le_bytes(array))
} else {
None
}
})
}
pub fn max_f64(&self) -> Option<f64> {
self.max_value.as_ref().and_then(|bytes| {
if bytes.len() == 8 {
let mut array = [0u8; 8];
array.copy_from_slice(bytes);
Some(f64::from_le_bytes(array))
} else {
None
}
})
}
pub fn min_i64(&self) -> Option<i64> {
self.min_value.as_ref().and_then(|bytes| {
if bytes.len() == 8 {
let mut array = [0u8; 8];
array.copy_from_slice(bytes);
Some(i64::from_le_bytes(array))
} else {
None
}
})
}
pub fn max_i64(&self) -> Option<i64> {
self.max_value.as_ref().and_then(|bytes| {
if bytes.len() == 8 {
let mut array = [0u8; 8];
array.copy_from_slice(bytes);
Some(i64::from_le_bytes(array))
} else {
None
}
})
}
pub fn min_i32(&self) -> Option<i32> {
self.min_value.as_ref().and_then(|bytes| {
if bytes.len() == 4 {
let mut array = [0u8; 4];
array.copy_from_slice(bytes);
Some(i32::from_le_bytes(array))
} else {
None
}
})
}
pub fn max_i32(&self) -> Option<i32> {
self.max_value.as_ref().and_then(|bytes| {
if bytes.len() == 4 {
let mut array = [0u8; 4];
array.copy_from_slice(bytes);
Some(i32::from_le_bytes(array))
} else {
None
}
})
}
pub fn has_nulls(&self) -> bool {
self.null_count.map_or(false, |count| count > 0)
}
pub fn null_percentage(&self) -> f64 {
if self.num_values == 0 {
0.0
} else {
self.null_count.unwrap_or(0) as f64 / self.num_values as f64
}
}
}
#[derive(Debug, Clone)]
pub struct ParquetFileStatistics {
pub num_rows: i64,
pub num_row_groups: usize,
pub columns: HashMap<String, ColumnStatistics>,
pub version: i32,
pub created_by: Option<String>,
}
impl ParquetFileStatistics {
pub fn column_stats(&self, name: &str) -> Option<&ColumnStatistics> {
self.columns.get(name)
}
pub fn column_names(&self) -> Vec<&str> {
self.columns.keys().map(|s| s.as_str()).collect()
}
pub fn has_statistics(&self) -> bool {
!self.columns.is_empty()
}
}
pub fn read_parquet_statistics<P: AsRef<Path>>(path: P) -> Result<ParquetFileStatistics> {
let file = File::open(path.as_ref()).map_err(|e| {
IoError::FileError(format!(
"Failed to open file '{}': {}",
path.as_ref().display(),
e
))
})?;
let reader = SerializedFileReader::new(file)
.map_err(|e| IoError::ParquetError(format!("Failed to create Parquet reader: {}", e)))?;
let metadata = reader.metadata();
extract_statistics(metadata)
}
fn extract_statistics(metadata: &ParquetMetaData) -> Result<ParquetFileStatistics> {
let file_metadata = metadata.file_metadata();
let num_rows = file_metadata.num_rows();
let num_row_groups = metadata.num_row_groups();
let version = file_metadata.version();
let created_by = file_metadata.created_by().map(|s| s.to_string());
let mut columns: HashMap<String, ColumnStatistics> = HashMap::new();
let schema = file_metadata.schema_descr();
for (col_idx, column) in schema.columns().iter().enumerate() {
let column_name = column.name().to_string();
let mut total_null_count: Option<i64> = None;
let mut total_distinct_count: Option<i64> = None;
let mut total_num_values: i64 = 0;
let mut min_value: Option<Vec<u8>> = None;
let mut max_value: Option<Vec<u8>> = None;
for rg_idx in 0..num_row_groups {
let row_group = metadata.row_group(rg_idx);
if col_idx < row_group.num_columns() {
let col_chunk = row_group.column(col_idx);
if let Some(stats) = col_chunk.statistics() {
if let Some(null_count) = stats.null_count_opt() {
total_null_count = Some(total_null_count.unwrap_or(0) + null_count as i64);
}
if let Some(distinct) = stats.distinct_count_opt() {
total_distinct_count =
Some(total_distinct_count.unwrap_or(0) + distinct as i64);
}
if let Some(min_bytes) = stats.min_bytes_opt() {
if min_value.is_none()
|| (min_bytes
< min_value.as_ref().expect("Operation failed").as_slice())
{
min_value = Some(min_bytes.to_vec());
}
}
if let Some(max_bytes) = stats.max_bytes_opt() {
if max_value.is_none()
|| (max_bytes
> max_value.as_ref().expect("Operation failed").as_slice())
{
max_value = Some(max_bytes.to_vec());
}
}
}
total_num_values += col_chunk.num_values();
}
}
columns.insert(
column_name.clone(),
ColumnStatistics {
name: column_name,
null_count: total_null_count,
distinct_count: total_distinct_count,
min_value,
max_value,
num_values: total_num_values,
},
);
}
Ok(ParquetFileStatistics {
num_rows,
num_row_groups,
columns,
version,
created_by,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parquet::write_parquet;
use scirs2_core::ndarray::Array1;
use tempfile::tempdir;
#[test]
fn test_read_statistics() {
let dir = tempdir().expect("Operation failed");
let path = dir.path().join("stats_test.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
write_parquet(&path, &data, Default::default()).expect("Operation failed");
let stats = read_parquet_statistics(&path).expect("Operation failed");
assert_eq!(stats.num_rows, 5);
assert!(stats.num_row_groups > 0);
assert!(stats.has_statistics());
}
#[test]
fn test_column_statistics() {
let dir = tempdir().expect("Operation failed");
let path = dir.path().join("col_stats.parquet");
let data = Array1::from_vec(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
write_parquet(&path, &data, Default::default()).expect("Operation failed");
let stats = read_parquet_statistics(&path).expect("Operation failed");
let col_stats = stats.column_stats("value").expect("Operation failed");
assert_eq!(col_stats.num_values, 5);
assert!(!col_stats.has_nulls());
assert_eq!(col_stats.null_percentage(), 0.0);
}
#[test]
fn test_statistics_min_max_f64() {
let dir = tempdir().expect("Operation failed");
let path = dir.path().join("minmax.parquet");
let data = Array1::from_vec(vec![5.5, 1.2, 9.8, 3.3, 7.1]);
write_parquet(&path, &data, Default::default()).expect("Operation failed");
let stats = read_parquet_statistics(&path).expect("Operation failed");
let col_stats = stats.column_stats("value").expect("Operation failed");
assert!(col_stats.min_value.is_some() || col_stats.max_value.is_some());
}
#[test]
fn test_file_statistics_metadata() {
let dir = tempdir().expect("Operation failed");
let path = dir.path().join("metadata.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
write_parquet(&path, &data, Default::default()).expect("Operation failed");
let stats = read_parquet_statistics(&path).expect("Operation failed");
assert_eq!(stats.num_rows, 3);
assert!(stats.version > 0);
assert!(stats.created_by.is_some());
assert!(stats.column_names().contains(&"value"));
}
#[test]
fn test_statistics_large_file() {
let dir = tempdir().expect("Operation failed");
let path = dir.path().join("large_stats.parquet");
let data: Vec<f64> = (0..1000).map(|x| x as f64).collect();
let array = Array1::from_vec(data);
write_parquet(&path, &array, Default::default()).expect("Operation failed");
let stats = read_parquet_statistics(&path).expect("Operation failed");
assert_eq!(stats.num_rows, 1000);
let col_stats = stats.column_stats("value").expect("Operation failed");
assert_eq!(col_stats.num_values, 1000);
}
}