polars_io/parquet/read/
predicates.rs

1use arrow::array::{MutablePrimitiveArray, PrimitiveArray};
2use arrow::pushable::Pushable;
3use polars_core::prelude::*;
4use polars_parquet::read::RowGroupMetadata;
5use polars_parquet::read::statistics::{ArrowColumnStatisticsArrays, deserialize_all};
6
7/// Collect the statistics in a row-group
8pub fn collect_statistics_with_live_columns(
9    row_groups: &[RowGroupMetadata],
10    schema: &ArrowSchema,
11    live_columns: &PlIndexSet<PlSmallStr>,
12    row_index: Option<(&PlSmallStr, IdxSize)>,
13) -> PolarsResult<Vec<Option<ArrowColumnStatisticsArrays>>> {
14    if row_groups.is_empty() {
15        return Ok((0..live_columns.len()).map(|_| None).collect());
16    }
17
18    let md = &row_groups[0];
19
20    live_columns
21        .iter()
22        .map(|c| {
23            let Some(field) = schema.get(c) else {
24                // Should be the row index column
25
26                let Some((name, mut offset)) = row_index else {
27                    if cfg!(debug_assertions) {
28                        panic!()
29                    }
30                    return Ok(None);
31                };
32
33                if c != name {
34                    if cfg!(debug_assertions) {
35                        panic!()
36                    }
37                    return Ok(None);
38                }
39
40                let null_count =
41                    PrimitiveArray::<IdxSize>::full(row_groups.len(), 0, ArrowDataType::IDX_DTYPE);
42
43                let mut distinct_count =
44                    MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());
45                let mut min_value =
46                    MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());
47                let mut max_value =
48                    MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());
49
50                for rg in row_groups.iter() {
51                    let n_rows = IdxSize::try_from(rg.num_rows()).unwrap_or(IdxSize::MAX);
52                    distinct_count.push_value(n_rows);
53
54                    if offset.checked_add(n_rows).is_none() {
55                        min_value.push_null();
56                        max_value.push_null();
57                        continue;
58                    }
59
60                    if n_rows == 0 {
61                        min_value.push_null();
62                        max_value.push_null();
63                    } else {
64                        min_value.push_value(offset);
65                        max_value.push_value(offset + n_rows - 1);
66                    }
67
68                    offset = offset.saturating_add(n_rows);
69                }
70
71                let out = ArrowColumnStatisticsArrays {
72                    null_count,
73                    distinct_count: distinct_count.freeze(),
74                    min_value: min_value.freeze().boxed(),
75                    max_value: max_value.freeze().boxed(),
76                };
77
78                return Ok(Some(out));
79            };
80
81            // This can be None in the allow_missing_columns case.
82            let Some(idxs) = md.columns_idxs_under_root_iter(&field.name) else {
83                return Ok(None);
84            };
85
86            // 0 is possible for possible for empty structs.
87            //
88            // 2+ is for structs. We don't support reading nested statistics for now. It does not
89            // really make any sense at the moment with how we structure statistics.
90            if idxs.is_empty() || idxs.len() > 1 {
91                return Ok(None);
92            }
93
94            let idx = idxs[0];
95            Ok(deserialize_all(field, row_groups, idx)?)
96        })
97        .collect::<PolarsResult<Vec<_>>>()
98}