use crate::error::{IoError, Result};
use crate::parquet::reader::{ParquetChunkIterator, ParquetData};
use crate::parquet::statistics::read_parquet_statistics;
use arrow::array::RecordBatchReader;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use std::fs::File;
use std::path::Path;
#[derive(Debug, Clone)]
pub enum ParquetPredicate {
Eq(String, PredicateValue),
NotEq(String, PredicateValue),
Lt(String, PredicateValue),
LtEq(String, PredicateValue),
Gt(String, PredicateValue),
GtEq(String, PredicateValue),
IsNull(String),
IsNotNull(String),
In(String, Vec<PredicateValue>),
And(Vec<ParquetPredicate>),
Or(Vec<ParquetPredicate>),
Not(Box<ParquetPredicate>),
}
#[derive(Debug, Clone, PartialEq)]
pub enum PredicateValue {
Float64(f64),
Float32(f32),
Int64(i64),
Int32(i32),
Boolean(bool),
String(String),
}
impl ParquetPredicate {
pub fn eq(column: impl Into<String>, value: PredicateValue) -> Self {
Self::Eq(column.into(), value)
}
pub fn not_eq(column: impl Into<String>, value: PredicateValue) -> Self {
Self::NotEq(column.into(), value)
}
pub fn lt(column: impl Into<String>, value: PredicateValue) -> Self {
Self::Lt(column.into(), value)
}
pub fn lt_eq(column: impl Into<String>, value: PredicateValue) -> Self {
Self::LtEq(column.into(), value)
}
pub fn gt(column: impl Into<String>, value: PredicateValue) -> Self {
Self::Gt(column.into(), value)
}
pub fn gt_eq(column: impl Into<String>, value: PredicateValue) -> Self {
Self::GtEq(column.into(), value)
}
pub fn is_null(column: impl Into<String>) -> Self {
Self::IsNull(column.into())
}
pub fn is_not_null(column: impl Into<String>) -> Self {
Self::IsNotNull(column.into())
}
pub fn in_values(column: impl Into<String>, values: Vec<PredicateValue>) -> Self {
Self::In(column.into(), values)
}
pub fn and(predicates: Vec<ParquetPredicate>) -> Self {
Self::And(predicates)
}
pub fn or(predicates: Vec<ParquetPredicate>) -> Self {
Self::Or(predicates)
}
pub fn not(predicate: ParquetPredicate) -> Self {
Self::Not(Box::new(predicate))
}
pub fn can_skip_row_group_f64(&self, min: f64, max: f64) -> bool {
match self {
ParquetPredicate::Lt(_, PredicateValue::Float64(val)) => min >= *val,
ParquetPredicate::LtEq(_, PredicateValue::Float64(val)) => min > *val,
ParquetPredicate::Gt(_, PredicateValue::Float64(val)) => max <= *val,
ParquetPredicate::GtEq(_, PredicateValue::Float64(val)) => max < *val,
ParquetPredicate::Eq(_, PredicateValue::Float64(val)) => max < *val || min > *val,
ParquetPredicate::And(predicates) => predicates
.iter()
.any(|p| p.can_skip_row_group_f64(min, max)),
ParquetPredicate::Or(predicates) => predicates
.iter()
.all(|p| p.can_skip_row_group_f64(min, max)),
_ => false, }
}
pub fn column_name(&self) -> Option<&str> {
match self {
ParquetPredicate::Eq(col, _)
| ParquetPredicate::NotEq(col, _)
| ParquetPredicate::Lt(col, _)
| ParquetPredicate::LtEq(col, _)
| ParquetPredicate::Gt(col, _)
| ParquetPredicate::GtEq(col, _)
| ParquetPredicate::IsNull(col)
| ParquetPredicate::IsNotNull(col)
| ParquetPredicate::In(col, _) => Some(col),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct FilterConfig {
pub predicate: ParquetPredicate,
pub batch_size: usize,
pub columns: Option<Vec<String>>,
}
impl FilterConfig {
pub fn new(predicate: ParquetPredicate) -> Self {
Self {
predicate,
batch_size: 1024,
columns: None,
}
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_columns(mut self, columns: Vec<String>) -> Self {
self.columns = Some(columns);
self
}
}
pub fn read_parquet_filtered<P: AsRef<Path>>(path: P, config: FilterConfig) -> Result<ParquetData> {
let iterator = read_parquet_filtered_chunked(path, config)?;
let chunks: Vec<ParquetData> = iterator.collect::<Result<Vec<_>>>()?;
if chunks.is_empty() {
return Err(IoError::ParquetError(
"No data matched the predicate".to_string(),
));
}
Ok(chunks.into_iter().next().expect("Operation failed"))
}
pub fn read_parquet_filtered_chunked<P: AsRef<Path>>(
path: P,
config: FilterConfig,
) -> Result<ParquetChunkIterator> {
let file = File::open(path.as_ref()).map_err(|e| {
IoError::FileError(format!(
"Failed to open file '{}': {}",
path.as_ref().display(),
e
))
})?;
let mut builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| IoError::ParquetError(format!("Failed to create Parquet reader: {}", e)))?;
if let Some(columns) = &config.columns {
let schema = builder.schema();
let mut column_indices = Vec::new();
for name in columns {
let index = schema
.fields()
.iter()
.position(|f| f.name() == name)
.ok_or_else(|| IoError::ParquetError(format!("Column '{}' not found", name)))?;
column_indices.push(index);
}
let projection = ProjectionMask::roots(builder.parquet_schema(), column_indices);
builder = builder.with_projection(projection);
}
let reader = builder
.with_batch_size(config.batch_size)
.build()
.map_err(|e| IoError::ParquetError(format!("Failed to build reader: {}", e)))?;
Ok(ParquetChunkIterator::new(reader, config.columns))
}
pub fn analyze_predicate_effectiveness<P: AsRef<Path>>(
path: P,
predicate: &ParquetPredicate,
) -> Result<PredicateAnalysis> {
let stats = read_parquet_statistics(path)?;
let total_row_groups = stats.num_row_groups;
Ok(PredicateAnalysis {
total_row_groups,
potentially_skippable: 0, estimated_speedup: 1.0,
})
}
#[derive(Debug, Clone)]
pub struct PredicateAnalysis {
pub total_row_groups: usize,
pub potentially_skippable: usize,
pub estimated_speedup: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parquet::write_parquet;
use scirs2_core::ndarray::Array1;
use tempfile::tempdir;
#[test]
fn test_predicate_creation() {
let pred = ParquetPredicate::gt("temperature", PredicateValue::Float64(20.0));
assert_eq!(pred.column_name(), Some("temperature"));
let pred2 = ParquetPredicate::is_null("optional_field");
assert_eq!(pred2.column_name(), Some("optional_field"));
}
#[test]
fn test_predicate_and_or() {
let pred1 = ParquetPredicate::gt("temp", PredicateValue::Float64(20.0));
let pred2 = ParquetPredicate::lt("temp", PredicateValue::Float64(30.0));
let and_pred = ParquetPredicate::and(vec![pred1.clone(), pred2.clone()]);
let or_pred = ParquetPredicate::or(vec![pred1, pred2]);
match and_pred {
ParquetPredicate::And(preds) => assert_eq!(preds.len(), 2),
_ => panic!("Expected And predicate"),
}
match or_pred {
ParquetPredicate::Or(preds) => assert_eq!(preds.len(), 2),
_ => panic!("Expected Or predicate"),
}
}
#[test]
fn test_can_skip_row_group() {
let pred = ParquetPredicate::gt("value", PredicateValue::Float64(100.0));
assert!(pred.can_skip_row_group_f64(0.0, 50.0));
assert!(!pred.can_skip_row_group_f64(0.0, 150.0));
}
#[test]
fn test_filter_config() {
let predicate = ParquetPredicate::eq("id", PredicateValue::Int32(42));
let config = FilterConfig::new(predicate)
.with_batch_size(5000)
.with_columns(vec!["id".to_string(), "name".to_string()]);
assert_eq!(config.batch_size, 5000);
assert_eq!(config.columns.as_ref().expect("Operation failed").len(), 2);
}
#[test]
fn test_read_filtered_basic() {
let dir = tempdir().expect("Operation failed");
let path = dir.path().join("filtered.parquet");
let data = Array1::from_vec(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
write_parquet(&path, &data, Default::default()).expect("Operation failed");
let predicate = ParquetPredicate::gt("value", PredicateValue::Float64(25.0));
let config = FilterConfig::new(predicate);
let result = read_parquet_filtered(&path, config);
assert!(result.is_ok());
}
#[test]
fn test_predicate_value_types() {
let v1 = PredicateValue::Float64(std::f64::consts::PI);
let v2 = PredicateValue::Int32(42);
let v3 = PredicateValue::Boolean(true);
let v4 = PredicateValue::String("test".to_string());
assert_eq!(v1, PredicateValue::Float64(std::f64::consts::PI));
assert_eq!(v2, PredicateValue::Int32(42));
assert_eq!(v3, PredicateValue::Boolean(true));
assert_eq!(v4, PredicateValue::String("test".to_string()));
}
}