pandrs/optimized/split_dataframe/
parallel.rs

1//! Parallel processing functionality for OptimizedDataFrame
2
3use rayon::prelude::*;
4use std::collections::HashMap;
5
6use super::core::OptimizedDataFrame;
7use crate::column::{BooleanColumn, Column, ColumnTrait, Float64Column, Int64Column, StringColumn};
8use crate::error::{Error, Result};
9
10impl OptimizedDataFrame {
11    /// Parallel row filtering
12    ///
13    /// Extracts only rows where the value in the condition column (Boolean type) is true,
14    /// applying parallel processing for large datasets.
15    ///
16    /// # Arguments
17    /// * `condition_column` - Name of Boolean column to use as filter condition
18    ///
19    /// # Returns
20    /// * `Result<Self>` - A new DataFrame with filtered rows
21    pub fn par_filter(&self, condition_column: &str) -> Result<Self> {
22        // Threshold for optimal parallelization (smaller datasets benefit from serial processing)
23        const PARALLEL_THRESHOLD: usize = 100_000;
24
25        // Get condition column
26        let column_idx = self
27            .column_indices
28            .get(condition_column)
29            .ok_or_else(|| Error::ColumnNotFound(condition_column.to_string()))?;
30
31        let condition = &self.columns[*column_idx];
32
33        // Verify the condition column is boolean type
34        if let Column::Boolean(bool_col) = condition {
35            let row_count = bool_col.len();
36
37            // Choose serial/parallel processing based on data size
38            let indices: Vec<usize> = if row_count < PARALLEL_THRESHOLD {
39                // Serial processing (small data)
40                (0..row_count)
41                    .filter_map(|i| {
42                        if let Ok(Some(true)) = bool_col.get(i) {
43                            Some(i)
44                        } else {
45                            None
46                        }
47                    })
48                    .collect()
49            } else {
50                // Parallel processing (large data)
51                // Optimize chunk size to reduce parallelization overhead
52                let chunk_size = (row_count / rayon::current_num_threads()).max(1000);
53
54                // First convert range to array, then process chunks
55                (0..row_count)
56                    .collect::<Vec<_>>()
57                    .par_chunks(chunk_size)
58                    .flat_map(|chunk| {
59                        chunk
60                            .iter()
61                            .filter_map(|&i| {
62                                if let Ok(Some(true)) = bool_col.get(i) {
63                                    Some(i)
64                                } else {
65                                    None
66                                }
67                            })
68                            .collect::<Vec<_>>()
69                    })
70                    .collect()
71            };
72
73            if indices.is_empty() {
74                // Return empty DataFrame
75                let mut result = Self::new();
76                for name in &self.column_names {
77                    let col_idx = self.column_indices[name];
78                    let empty_col = match &self.columns[col_idx] {
79                        Column::Int64(_) => Column::Int64(Int64Column::new(Vec::new())),
80                        Column::Float64(_) => Column::Float64(Float64Column::new(Vec::new())),
81                        Column::String(_) => Column::String(StringColumn::new(Vec::new())),
82                        Column::Boolean(_) => Column::Boolean(BooleanColumn::new(Vec::new())),
83                    };
84                    result.add_column(name.clone(), empty_col)?;
85                }
86                return Ok(result);
87            }
88
89            // Create new DataFrame
90            let mut result = Self::new();
91
92            // Pre-allocate vector for result columns
93            let mut result_columns = Vec::with_capacity(self.column_names.len());
94
95            // Choose column processing method based on data size
96            if indices.len() < PARALLEL_THRESHOLD || self.column_names.len() < 4 {
97                // Serial processing (small data or few columns)
98                for name in &self.column_names {
99                    let i = self.column_indices[name];
100                    let column = &self.columns[i];
101
102                    let filtered_column = match column {
103                        Column::Int64(col) => {
104                            let filtered_data: Vec<i64> = indices
105                                .iter()
106                                .map(|&idx| {
107                                    if let Ok(Some(val)) = col.get(idx) {
108                                        val
109                                    } else {
110                                        0 // Default value
111                                    }
112                                })
113                                .collect();
114                            Column::Int64(Int64Column::new(filtered_data))
115                        }
116                        Column::Float64(col) => {
117                            let filtered_data: Vec<f64> = indices
118                                .iter()
119                                .map(|&idx| {
120                                    if let Ok(Some(val)) = col.get(idx) {
121                                        val
122                                    } else {
123                                        0.0 // Default value
124                                    }
125                                })
126                                .collect();
127                            Column::Float64(Float64Column::new(filtered_data))
128                        }
129                        Column::String(col) => {
130                            let filtered_data: Vec<String> = indices
131                                .iter()
132                                .map(|&idx| {
133                                    if let Ok(Some(val)) = col.get(idx) {
134                                        val.to_string()
135                                    } else {
136                                        String::new() // Default value
137                                    }
138                                })
139                                .collect();
140                            Column::String(StringColumn::new(filtered_data))
141                        }
142                        Column::Boolean(col) => {
143                            let filtered_data: Vec<bool> = indices
144                                .iter()
145                                .map(|&idx| {
146                                    if let Ok(Some(val)) = col.get(idx) {
147                                        val
148                                    } else {
149                                        false // Default value
150                                    }
151                                })
152                                .collect();
153                            Column::Boolean(BooleanColumn::new(filtered_data))
154                        }
155                    };
156
157                    result_columns.push((name.clone(), filtered_column));
158                }
159            } else {
160                // Parallel processing for large data
161                // Process each column in parallel (coarse-grained parallelism at column level)
162                result_columns = self
163                    .column_names
164                    .par_iter()
165                    .map(|name| {
166                        let i = self.column_indices[name];
167                        let column = &self.columns[i];
168
169                        let indices_len = indices.len();
170                        let filtered_column = match column {
171                            Column::Int64(col) => {
172                                // Split large index list for processing
173                                let chunk_size = (indices_len / 8).max(1000);
174                                let mut filtered_data = Vec::with_capacity(indices_len);
175
176                                // Use chunks to ensure all elements are processed
177                                for chunk in indices.chunks(chunk_size) {
178                                    let chunk_data: Vec<i64> = chunk
179                                        .iter()
180                                        .map(|&idx| {
181                                            if let Ok(Some(val)) = col.get(idx) {
182                                                val
183                                            } else {
184                                                0 // Default value
185                                            }
186                                        })
187                                        .collect();
188                                    filtered_data.extend(chunk_data);
189                                }
190
191                                Column::Int64(Int64Column::new(filtered_data))
192                            }
193                            Column::Float64(col) => {
194                                // Split large index list for processing
195                                let chunk_size = (indices_len / 8).max(1000);
196                                let mut filtered_data = Vec::with_capacity(indices_len);
197
198                                // Use chunks to ensure all elements are processed
199                                for chunk in indices.chunks(chunk_size) {
200                                    let chunk_data: Vec<f64> = chunk
201                                        .iter()
202                                        .map(|&idx| {
203                                            if let Ok(Some(val)) = col.get(idx) {
204                                                val
205                                            } else {
206                                                0.0 // Default value
207                                            }
208                                        })
209                                        .collect();
210                                    filtered_data.extend(chunk_data);
211                                }
212
213                                Column::Float64(Float64Column::new(filtered_data))
214                            }
215                            Column::String(col) => {
216                                // String processing is especially heavy, use finer chunks
217                                let chunk_size = (indices_len / 16).max(500);
218                                let mut filtered_data = Vec::with_capacity(indices_len);
219
220                                // Use chunks to ensure all elements are processed
221                                for chunk in indices.chunks(chunk_size) {
222                                    let chunk_data: Vec<String> = chunk
223                                        .iter()
224                                        .map(|&idx| {
225                                            if let Ok(Some(val)) = col.get(idx) {
226                                                val.to_string()
227                                            } else {
228                                                String::new() // Default value
229                                            }
230                                        })
231                                        .collect();
232                                    filtered_data.extend(chunk_data);
233                                }
234
235                                Column::String(StringColumn::new(filtered_data))
236                            }
237                            Column::Boolean(col) => {
238                                let filtered_data: Vec<bool> = indices
239                                    .iter()
240                                    .map(|&idx| {
241                                        if let Ok(Some(val)) = col.get(idx) {
242                                            val
243                                        } else {
244                                            false // Default value
245                                        }
246                                    })
247                                    .collect();
248                                Column::Boolean(BooleanColumn::new(filtered_data))
249                            }
250                        };
251
252                        (name.clone(), filtered_column)
253                    })
254                    .collect();
255            }
256
257            // Add results to DataFrame
258            for (name, column) in result_columns {
259                result.add_column(name, column)?;
260            }
261
262            // Copy index
263            if let Some(ref idx) = self.index {
264                result.index = Some(idx.clone());
265            }
266
267            Ok(result)
268        } else {
269            Err(Error::OperationFailed(format!(
270                "Column '{}' is not of boolean type",
271                condition_column
272            )))
273        }
274    }
275}