polars_io/parquet/read/
predicates.rs

1use polars_core::config;
2use polars_core::prelude::*;
3use polars_parquet::read::statistics::{deserialize, Statistics};
4use polars_parquet::read::RowGroupMetadata;
5
6use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};
7
8/// Collect the statistics in a row-group
9pub(crate) fn collect_statistics(
10    md: &RowGroupMetadata,
11    schema: &ArrowSchema,
12) -> PolarsResult<Option<BatchStats>> {
13    // TODO! fix this performance. This is a full sequential scan.
14    let stats = schema
15        .iter_values()
16        .map(|field| {
17            let mut iter = md.columns_under_root_iter(&field.name).unwrap();
18
19            let statistics = deserialize(field, &mut iter)?;
20            assert!(iter.next().is_none());
21
22            // We don't support reading nested statistics for now. It does not really make any
23            // sense at the moment with how we structure statistics.
24            let Some(Statistics::Column(stats)) = statistics else {
25                return Ok(ColumnStats::new(field.into(), None, None, None));
26            };
27
28            let stats = stats.into_arrow()?;
29
30            let null_count = stats
31                .null_count
32                .map(|x| Scalar::from(x).into_series(PlSmallStr::EMPTY));
33            let min_value = stats
34                .min_value
35                .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
36            let max_value = stats
37                .max_value
38                .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
39
40            Ok(ColumnStats::new(
41                field.into(),
42                null_count,
43                min_value,
44                max_value,
45            ))
46        })
47        .collect::<PolarsResult<Vec<_>>>()?;
48
49    if stats.is_empty() {
50        return Ok(None);
51    }
52
53    Ok(Some(BatchStats::new(
54        Arc::new(Schema::from_arrow_schema(schema)),
55        stats,
56        Some(md.num_rows()),
57    )))
58}
59
60pub fn read_this_row_group(
61    predicate: Option<&dyn PhysicalIoExpr>,
62    md: &RowGroupMetadata,
63    schema: &ArrowSchema,
64) -> PolarsResult<bool> {
65    if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
66        return Ok(true);
67    }
68
69    let mut should_read = true;
70
71    if let Some(pred) = predicate {
72        if let Some(pred) = pred.as_stats_evaluator() {
73            if let Some(stats) = collect_statistics(md, schema)? {
74                let pred_result = pred.should_read(&stats);
75
76                // a parquet file may not have statistics of all columns
77                match pred_result {
78                    Err(PolarsError::ColumnNotFound(errstr)) => {
79                        return Err(PolarsError::ColumnNotFound(errstr))
80                    },
81                    Ok(false) => should_read = false,
82                    _ => {},
83                }
84            }
85        }
86
87        if config::verbose() {
88            if should_read {
89                eprintln!(
90                    "parquet row group must be read, statistics not sufficient for predicate."
91                );
92            } else {
93                eprintln!("parquet row group can be skipped, the statistics were sufficient to apply the predicate.");
94            }
95        }
96    }
97
98    Ok(should_read)
99}