polars_pipe/operators/
chunks.rs1use polars_core::utils::accumulate_dataframes_vertical_unchecked;
2
3use super::*;
4
5#[derive(Clone, Debug)]
6pub struct DataChunk {
7 pub chunk_index: IdxSize,
8 pub data: DataFrame,
9}
10
11impl DataChunk {
12 pub(crate) fn new(chunk_index: IdxSize, data: DataFrame) -> Self {
13 #[cfg(debug_assertions)]
15 {
16 for c in data.get_columns() {
17 assert_eq!(c.as_materialized_series().chunks().len(), 1);
18 }
19 }
20 Self { chunk_index, data }
21 }
22 pub(crate) fn with_data(&self, data: DataFrame) -> Self {
23 Self::new(self.chunk_index, data)
24 }
25 pub(crate) fn is_empty(&self) -> bool {
26 self.data.is_empty()
27 }
28}
29
30pub(crate) fn chunks_to_df_unchecked(chunks: Vec<DataChunk>) -> DataFrame {
31 accumulate_dataframes_vertical_unchecked(chunks.into_iter().map(|c| c.data))
32}
33
34#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
48#[derive(Clone)]
49pub(crate) struct StreamingVstacker {
50 current_dataframe: Option<DataFrame>,
51 output_chunk_size: usize,
53}
54
55#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
56impl StreamingVstacker {
57 pub fn new(output_chunk_size: usize) -> Self {
59 Self {
60 current_dataframe: None,
61 output_chunk_size,
62 }
63 }
64
65 pub fn add(&mut self, next_frame: DataFrame) -> impl Iterator<Item = DataFrame> {
68 let mut result: [Option<DataFrame>; 2] = [None, None];
69
70 if self.current_dataframe.is_some()
73 && next_frame.estimated_size() > self.output_chunk_size / 4
74 {
75 result[0] = self.flush();
76 }
77
78 if let Some(ref mut current_frame) = self.current_dataframe {
79 current_frame
80 .vstack_mut(&next_frame)
81 .expect("These are chunks from the same dataframe");
82 } else {
83 self.current_dataframe = Some(next_frame);
84 };
85
86 if self.current_dataframe.as_ref().unwrap().estimated_size() > self.output_chunk_size {
87 result[1] = self.flush();
88 }
89 result.into_iter().flatten()
90 }
91
92 #[must_use]
94 fn flush(&mut self) -> Option<DataFrame> {
95 std::mem::take(&mut self.current_dataframe)
96 }
97
98 #[must_use]
101 pub fn finish(mut self) -> Option<DataFrame> {
102 self.flush()
103 }
104}
105
106#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
107impl Default for StreamingVstacker {
108 fn default() -> Self {
112 StreamingVstacker::new(4 * 1024 * 1024)
113 }
114}
115
116#[cfg(test)]
117#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
118mod test {
119 use super::*;
120
121 #[test]
124 fn semicontiguous_vstacker_merges() {
125 let test = semicontiguous_vstacker_merges_impl;
126 test(vec![10]);
127 test(vec![10, 10, 10, 10, 10, 10, 10]);
128 test(vec![10, 40, 10, 10, 10, 10]);
129 test(vec![40, 10, 10, 40, 10, 10, 40]);
130 test(vec![50, 50, 50]);
131 }
132
133 fn semicontiguous_vstacker_merges_impl(df_lengths: Vec<usize>) {
135 let mut vstacker = StreamingVstacker::new(4096);
137 let dfs: Vec<DataFrame> = df_lengths
138 .iter()
139 .enumerate()
140 .map(|(i, length)| {
141 let series = Column::new("val".into(), vec![i as u64; *length]);
142 DataFrame::new(vec![series]).unwrap()
143 })
144 .collect();
145
146 let mut results = vec![];
148 for (i, df) in dfs.iter().enumerate() {
149 for mut result_df in vstacker.add(df.clone()) {
150 result_df.as_single_chunk();
151 results.push((i, result_df));
152 }
153 }
154 if let Some(mut result_df) = vstacker.finish() {
155 result_df.as_single_chunk();
156 results.push((df_lengths.len() - 1, result_df));
157 }
158
159 for (original_idx, result_df) in &results {
162 if result_df.height() < 40 {
163 if *original_idx < results.len() - 1 {
166 assert!(dfs[original_idx + 1].height() > 10);
167 }
168 }
169 assert_eq!(
171 result_df.get_columns()[0]
172 .as_materialized_series()
173 .chunk_lengths()
174 .len(),
175 1
176 );
177 }
178
179 assert_eq!(
181 accumulate_dataframes_vertical_unchecked(dfs.into_iter()),
182 accumulate_dataframes_vertical_unchecked(results.into_iter().map(|(_, df)| df)),
183 );
184 }
185}