1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::collections::BTreeSet;
use super::*;
/// Projection in the physical plan is done by selecting an expression per thread.
/// In case of many projections and columns this can be expensive when the expressions are simple
/// column selections. These can be selected on a single thread. The single thread is faster, because
/// the eager selection algorithm hashes the column names, making the projection complexity linear
/// instead of quadratic.
///
/// It is important that this optimization is ran after projection pushdown.
///
/// The schema reported after this optimization is also
pub(super) struct SimpleProjectionAndCollapse {
/// Keep track of nodes that are already processed when they
/// can be expensive. Schema materialization can be for instance.
processed: BTreeSet<Node>,
eager: bool,
}
impl SimpleProjectionAndCollapse {
pub(super) fn new(eager: bool) -> Self {
Self {
processed: Default::default(),
eager,
}
}
}
impl OptimizationRule for SimpleProjectionAndCollapse {
fn optimize_plan(
&mut self,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
node: Node,
) -> PolarsResult<Option<IR>> {
use IR::*;
let lp = lp_arena.get(node);
match lp {
Select { input, expr, .. } => {
if !matches!(lp_arena.get(*input), ExtContext { .. })
&& !self.processed.contains(&node)
{
// First check if we can apply the optimization before we allocate.
if !expr.iter().all(|e| {
matches!(expr_arena.get(e.node()), AExpr::Column(name) if e.output_name() == name)
}) {
self.processed.insert(node);
return Ok(None);
}
let exprs = expr
.iter()
.map(|e| e.output_name().clone())
.collect::<Vec<_>>();
let Some(alp) = IRBuilder::new(*input, expr_arena, lp_arena)
.project_simple(exprs.iter().cloned())
.ok()
else {
return Ok(None);
};
let alp = alp.build();
Ok(Some(alp))
} else {
self.processed.insert(node);
Ok(None)
}
},
SimpleProjection { columns, input } if !self.eager => {
match lp_arena.get(*input) {
// If there are 2 subsequent fast projections, flatten them and only take the last
SimpleProjection {
input: prev_input, ..
} => Ok(Some(SimpleProjection {
input: *prev_input,
columns: columns.clone(),
})),
// Cleanup projections set in projection pushdown just above caches
// they are not needed.
cache_lp @ Cache { .. } if self.processed.contains(&node) => {
let cache_schema = cache_lp.schema(lp_arena);
if cache_schema.len() == columns.len()
&& cache_schema.iter_names().zip(columns.iter_names()).all(
|(left_name, right_name)| left_name.as_str() == right_name.as_str(),
)
{
Ok(Some(cache_lp.clone()))
} else {
Ok(None)
}
},
// If a projection does nothing, remove it.
other => {
let input_schema = other.schema(lp_arena);
// This will fail fast if lengths are not equal
if *input_schema.as_ref() == *columns {
Ok(Some(other.clone()))
} else {
self.processed.insert(node);
Ok(None)
}
},
}
},
// if there are 2 subsequent caches, flatten them and only take the inner
Cache { input, .. } if !self.eager => {
if let Cache {
input: prev_input,
id,
} = lp_arena.get(*input)
{
Ok(Some(Cache {
input: *prev_input,
id: *id,
}))
} else {
Ok(None)
}
},
_ => Ok(None),
}
}
}