1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use std::collections::BTreeSet;
use super::*;
#[derive(Default)]
pub(super) struct DelayRechunk {
processed: BTreeSet<usize>,
}
impl DelayRechunk {
pub(super) fn new() -> Self {
Default::default()
}
}
impl OptimizationRule for DelayRechunk {
fn optimize_plan(
&mut self,
lp_arena: &mut Arena<IR>,
_expr_arena: &mut Arena<AExpr>,
node: Node,
) -> PolarsResult<Option<IR>> {
match lp_arena.get(node) {
// An aggregation can be partitioned, its wasteful to rechunk before that partition.
#[allow(unused_mut)]
IR::GroupBy { input, keys, .. } => {
// Multiple keys on multiple chunks is much slower, so rechunk.
if !self.processed.insert(node.0) || keys.len() > 1 {
return Ok(None);
};
use IR::*;
let mut input_node = None;
for (node, lp) in lp_arena.iter(*input) {
match lp {
Scan { .. } => {
input_node = Some(node);
break;
},
Union { .. } => {
input_node = Some(node);
break;
},
// don't delay rechunk if there is a join first
Join { .. } => break,
_ => {},
}
}
if let Some(node) = input_node {
match lp_arena.get_mut(node) {
Scan {
unified_scan_args, ..
} => {
unified_scan_args.rechunk = false;
},
Union { options, .. } => {
options.rechunk = false;
},
_ => unreachable!(),
}
};
Ok(None)
},
_ => Ok(None),
}
}
}