1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//! Prune `FileScanIR::Parquet`'s pre-decoded metadata (`first_metadata`
//! and `metadata_per_source`) to projected + predicate columns.
//!
//! Runs at the end of the optimizer pipeline, after projection and predicate
//! pushdown have resolved each scan's projection / predicate. Walks the IR
//! arena and replaces each entry via `FileMetadata::pruned`.
//!
//! Gated on `POLARS_PRUNE_PARQUET_METADATA=1` (default off; single-node
//! execution gets no benefit since metadata never crosses a wire).
#[cfg(feature = "parquet")]
use std::sync::Arc;
#[cfg(feature = "parquet")]
use polars_io::parquet::metadata::FileMetadataRef;
use polars_utils::arena::{Arena, Node};
#[cfg(feature = "parquet")]
use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "parquet")]
use polars_utils::unitvec;
#[cfg(feature = "parquet")]
use crate::dsl::FileScanIR;
#[cfg(feature = "parquet")]
use crate::plans::AExpr;
use crate::plans::IR;
#[cfg(feature = "parquet")]
use crate::utils::aexpr_to_leaf_names_iter;
#[cfg(feature = "parquet")]
pub(super) fn prune_parquet_metadata(
root: Node,
ir_arena: &mut Arena<IR>,
expr_arena: &Arena<AExpr>,
) {
// Gate on env var; default off (single-node has no benefit).
if !polars_config::config().prune_parquet_metadata() {
return;
}
let mut stack = unitvec![root];
while let Some(node) = stack.pop() {
ir_arena.get(node).copy_inputs(&mut stack);
let IR::Scan {
scan_type,
unified_scan_args,
predicate,
..
} = ir_arena.get_mut(node)
else {
continue;
};
// Variant check first: skip the projection/predicate work entirely
// for non-Parquet scans (CSV, IPC, NDJson, Python, Anonymous).
let FileScanIR::Parquet {
first_metadata,
metadata_per_source,
..
} = scan_type.as_mut()
else {
continue;
};
let Some(projection) = unified_scan_args.projection.clone() else {
continue;
};
// Predicate cols are a subset of the projection by this point:
// projection pushdown adds predicate-referenced cols to the scan
// (else the pushed-down predicate would `ColumnNotFound` at execute).
let predicate_cols: Vec<PlSmallStr> = predicate
.as_ref()
.map(|pred_ir| {
aexpr_to_leaf_names_iter(pred_ir.node(), expr_arena)
.cloned()
.collect()
})
.unwrap_or_default();
// Pruning failure (chunks-vs-leaves desync inside `from_compact`) is
// recoverable; fall back to unpruned metadata so the query still runs.
// The wire form just stays larger for this entry. Swap-only-if-smaller
// (leaf count) avoids allocating a new inner Arc when nothing changed.
let prune_one = |m: &FileMetadataRef| -> FileMetadataRef {
m.pruned(&projection, &predicate_cols)
.ok()
.filter(|p| p.schema_descr.columns().len() < m.schema_descr.columns().len())
.map(Arc::new)
.unwrap_or_else(|| m.clone())
};
*first_metadata = first_metadata.as_ref().map(prune_one);
*metadata_per_source = metadata_per_source
.as_ref()
.map(|s| s.iter().map(prune_one).collect());
}
}
#[cfg(not(feature = "parquet"))]
pub(super) fn prune_parquet_metadata(
_root: Node,
_ir_arena: &mut Arena<IR>,
_expr_arena: &Arena<crate::plans::AExpr>,
) {
// No-op when parquet feature is disabled.
}