polars_pipe/operators/
chunks.rs

1use polars_core::utils::accumulate_dataframes_vertical_unchecked;
2
3use super::*;
4
5#[derive(Clone, Debug)]
6pub struct DataChunk {
7    pub chunk_index: IdxSize,
8    pub data: DataFrame,
9}
10
11impl DataChunk {
12    pub(crate) fn new(chunk_index: IdxSize, data: DataFrame) -> Self {
13        // Check the invariant that all columns have a single chunk.
14        #[cfg(debug_assertions)]
15        {
16            for c in data.get_columns() {
17                assert_eq!(c.as_materialized_series().chunks().len(), 1);
18            }
19        }
20        Self { chunk_index, data }
21    }
22    pub(crate) fn with_data(&self, data: DataFrame) -> Self {
23        Self::new(self.chunk_index, data)
24    }
25    pub(crate) fn is_empty(&self) -> bool {
26        self.data.is_empty()
27    }
28}
29
30pub(crate) fn chunks_to_df_unchecked(chunks: Vec<DataChunk>) -> DataFrame {
31    accumulate_dataframes_vertical_unchecked(chunks.into_iter().map(|c| c.data))
32}
33
34/// Combine a series of `DataFrame`s, and if they're small enough, combine them
35/// into larger `DataFrame`s using `vstack`. This allows the caller to turn them
36/// into contiguous memory allocations so that we don't suffer from overhead of
37/// many small writes. The assumption is that added `DataFrame`s are already in
38/// the correct order, and can therefore be combined.
39///
40/// The benefit of having a series of `DataFrame` that are e.g. 4MB each that
41/// are then made contiguous is that you're not using a lot of memory (an extra
42/// 4MB), but you're still doing better than if you had a series of 2KB
43/// `DataFrame`s.
44///
45/// Changing the `DataFrame` into contiguous chunks is the caller's
46/// responsibility.
47#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
48#[derive(Clone)]
49pub(crate) struct StreamingVstacker {
50    current_dataframe: Option<DataFrame>,
51    /// How big should resulting chunks be, if possible?
52    output_chunk_size: usize,
53}
54
55#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
56impl StreamingVstacker {
57    /// Create a new instance.
58    pub fn new(output_chunk_size: usize) -> Self {
59        Self {
60            current_dataframe: None,
61            output_chunk_size,
62        }
63    }
64
65    /// Add another `DataFrame`, return (potentially combined) `DataFrame`s that
66    /// result, if any.
67    pub fn add(&mut self, next_frame: DataFrame) -> impl Iterator<Item = DataFrame> {
68        let mut result: [Option<DataFrame>; 2] = [None, None];
69
70        // If the next chunk is too large, we probably don't want make copies of
71        // it if a caller does as_single_chunk(), so we flush in advance.
72        if self.current_dataframe.is_some()
73            && next_frame.estimated_size() > self.output_chunk_size / 4
74        {
75            result[0] = self.flush();
76        }
77
78        if let Some(ref mut current_frame) = self.current_dataframe {
79            current_frame
80                .vstack_mut(&next_frame)
81                .expect("These are chunks from the same dataframe");
82        } else {
83            self.current_dataframe = Some(next_frame);
84        };
85
86        if self.current_dataframe.as_ref().unwrap().estimated_size() > self.output_chunk_size {
87            result[1] = self.flush();
88        }
89        result.into_iter().flatten()
90    }
91
92    /// Clear and return any cached `DataFrame` data.
93    #[must_use]
94    fn flush(&mut self) -> Option<DataFrame> {
95        std::mem::take(&mut self.current_dataframe)
96    }
97
98    /// Finish and return any remaining cached `DataFrame` data. The only way
99    /// that `SemicontiguousVstacker` should be cleaned up.
100    #[must_use]
101    pub fn finish(mut self) -> Option<DataFrame> {
102        self.flush()
103    }
104}
105
106#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
107impl Default for StreamingVstacker {
108    /// 4 MB was chosen based on some empirical experiments that showed it to
109    /// be decently faster than lower or higher values, and it's small enough
110    /// it won't impact memory usage significantly.
111    fn default() -> Self {
112        StreamingVstacker::new(4 * 1024 * 1024)
113    }
114}
115
116#[cfg(test)]
117#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
118mod test {
119    use super::*;
120
121    /// DataFrames get merged into chunks that are bigger than the specified
122    /// size when possible.
123    #[test]
124    fn semicontiguous_vstacker_merges() {
125        let test = semicontiguous_vstacker_merges_impl;
126        test(vec![10]);
127        test(vec![10, 10, 10, 10, 10, 10, 10]);
128        test(vec![10, 40, 10, 10, 10, 10]);
129        test(vec![40, 10, 10, 40, 10, 10, 40]);
130        test(vec![50, 50, 50]);
131    }
132
133    /// Eventually would be nice to drive this with proptest.
134    fn semicontiguous_vstacker_merges_impl(df_lengths: Vec<usize>) {
135        // Convert the lengths into a series of DataFrames:
136        let mut vstacker = StreamingVstacker::new(4096);
137        let dfs: Vec<DataFrame> = df_lengths
138            .iter()
139            .enumerate()
140            .map(|(i, length)| {
141                let series = Column::new("val".into(), vec![i as u64; *length]);
142                DataFrame::new(vec![series]).unwrap()
143            })
144            .collect();
145
146        // Combine the DataFrames using a SemicontiguousVstacker:
147        let mut results = vec![];
148        for (i, df) in dfs.iter().enumerate() {
149            for mut result_df in vstacker.add(df.clone()) {
150                result_df.as_single_chunk();
151                results.push((i, result_df));
152            }
153        }
154        if let Some(mut result_df) = vstacker.finish() {
155            result_df.as_single_chunk();
156            results.push((df_lengths.len() - 1, result_df));
157        }
158
159        // Make sure the lengths are as sufficiently large, and the chunks
160        // were merged, the whole point of the exercise:
161        for (original_idx, result_df) in &results {
162            if result_df.height() < 40 {
163                // This means either this was the last df, or the next one
164                // was big enough we decided not to aggregate.
165                if *original_idx < results.len() - 1 {
166                    assert!(dfs[original_idx + 1].height() > 10);
167                }
168            }
169            // Make sure all result DataFrames only have a single chunk.
170            assert_eq!(
171                result_df.get_columns()[0]
172                    .as_materialized_series()
173                    .chunk_lengths()
174                    .len(),
175                1
176            );
177        }
178
179        // Make sure the data was preserved:
180        assert_eq!(
181            accumulate_dataframes_vertical_unchecked(dfs.into_iter()),
182            accumulate_dataframes_vertical_unchecked(results.into_iter().map(|(_, df)| df)),
183        );
184    }
185}