1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use polars_core::utils::concat_df;
use recursive::recursive;
use super::*;
pub(crate) struct UnionExec {
pub(crate) inputs: Vec<Box<dyn Executor>>,
pub(crate) options: UnionOptions,
}
impl Executor for UnionExec {
#[recursive]
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
state.should_stop()?;
#[cfg(debug_assertions)]
{
if state.verbose() {
eprintln!("run UnionExec")
}
}
let mut inputs = std::mem::take(&mut self.inputs);
let sliced_path = if let Some((offset, _)) = self.options.slice {
offset >= 0
} else {
false
};
if !self.options.parallel || sliced_path {
if state.verbose() {
if !self.options.parallel {
eprintln!("UNION: `parallel=false` union is run sequentially")
} else {
eprintln!("UNION: `slice is set` union is run sequentially")
}
}
let (slice_offset, mut slice_len) = self.options.slice.unwrap_or((0, usize::MAX));
let mut slice_offset = slice_offset as usize;
let mut dfs = Vec::with_capacity(inputs.len());
for (idx, mut input) in inputs.into_iter().enumerate() {
let mut state = state.split();
state.branch_idx += idx;
let df = input.execute(&mut state)?;
if !sliced_path {
dfs.push(df);
continue;
}
let height = df.height();
// this part can be skipped as we haven't reached the offset yet
// TODO!: don't read the file yet!
if slice_offset > height {
slice_offset -= height;
}
// applying the slice
// continue iteration
else if slice_offset + slice_len > height {
slice_len -= height - slice_offset;
if slice_offset == 0 {
dfs.push(df);
} else {
dfs.push(df.slice(slice_offset as i64, usize::MAX));
slice_offset = 0;
}
}
// we finished the slice
else {
dfs.push(df.slice(slice_offset as i64, slice_len));
break;
}
}
concat_df(&dfs)
} else {
if state.verbose() {
eprintln!("UNION: union is run in parallel")
}
// we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
// within bounds
let out = RAYON.install(|| {
inputs
.chunks_mut(RAYON.current_num_threads() * 3)
.map(|chunk| {
chunk
.into_par_iter()
.enumerate()
.map(|(idx, input)| {
let mut input = std::mem::take(input);
let mut state = state.split();
state.branch_idx += idx;
input.execute(&mut state)
})
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()
});
concat_df(out?.iter().flat_map(|dfs| dfs.iter())).map(|df| {
if let Some((offset, len)) = self.options.slice {
df.slice(offset, len)
} else {
df
}
})
}
.map(|mut df| {
if self.options.rechunk {
df.rechunk_mut_par();
}
df
})
}
}