pandrs/optimized/split_dataframe/parallel.rs
1//! Parallel processing functionality for OptimizedDataFrame
2
3use rayon::prelude::*;
4use std::collections::HashMap;
5
6use super::core::OptimizedDataFrame;
7use crate::column::{BooleanColumn, Column, ColumnTrait, Float64Column, Int64Column, StringColumn};
8use crate::error::{Error, Result};
9
10impl OptimizedDataFrame {
11 /// Parallel row filtering
12 ///
13 /// Extracts only rows where the value in the condition column (Boolean type) is true,
14 /// applying parallel processing for large datasets.
15 ///
16 /// # Arguments
17 /// * `condition_column` - Name of Boolean column to use as filter condition
18 ///
19 /// # Returns
20 /// * `Result<Self>` - A new DataFrame with filtered rows
21 pub fn par_filter(&self, condition_column: &str) -> Result<Self> {
22 // Threshold for optimal parallelization (smaller datasets benefit from serial processing)
23 const PARALLEL_THRESHOLD: usize = 100_000;
24
25 // Get condition column
26 let column_idx = self
27 .column_indices
28 .get(condition_column)
29 .ok_or_else(|| Error::ColumnNotFound(condition_column.to_string()))?;
30
31 let condition = &self.columns[*column_idx];
32
33 // Verify the condition column is boolean type
34 if let Column::Boolean(bool_col) = condition {
35 let row_count = bool_col.len();
36
37 // Choose serial/parallel processing based on data size
38 let indices: Vec<usize> = if row_count < PARALLEL_THRESHOLD {
39 // Serial processing (small data)
40 (0..row_count)
41 .filter_map(|i| {
42 if let Ok(Some(true)) = bool_col.get(i) {
43 Some(i)
44 } else {
45 None
46 }
47 })
48 .collect()
49 } else {
50 // Parallel processing (large data)
51 // Optimize chunk size to reduce parallelization overhead
52 let chunk_size = (row_count / rayon::current_num_threads()).max(1000);
53
54 // First convert range to array, then process chunks
55 (0..row_count)
56 .collect::<Vec<_>>()
57 .par_chunks(chunk_size)
58 .flat_map(|chunk| {
59 chunk
60 .iter()
61 .filter_map(|&i| {
62 if let Ok(Some(true)) = bool_col.get(i) {
63 Some(i)
64 } else {
65 None
66 }
67 })
68 .collect::<Vec<_>>()
69 })
70 .collect()
71 };
72
73 if indices.is_empty() {
74 // Return empty DataFrame
75 let mut result = Self::new();
76 for name in &self.column_names {
77 let col_idx = self.column_indices[name];
78 let empty_col = match &self.columns[col_idx] {
79 Column::Int64(_) => Column::Int64(Int64Column::new(Vec::new())),
80 Column::Float64(_) => Column::Float64(Float64Column::new(Vec::new())),
81 Column::String(_) => Column::String(StringColumn::new(Vec::new())),
82 Column::Boolean(_) => Column::Boolean(BooleanColumn::new(Vec::new())),
83 };
84 result.add_column(name.clone(), empty_col)?;
85 }
86 return Ok(result);
87 }
88
89 // Create new DataFrame
90 let mut result = Self::new();
91
92 // Pre-allocate vector for result columns
93 let mut result_columns = Vec::with_capacity(self.column_names.len());
94
95 // Choose column processing method based on data size
96 if indices.len() < PARALLEL_THRESHOLD || self.column_names.len() < 4 {
97 // Serial processing (small data or few columns)
98 for name in &self.column_names {
99 let i = self.column_indices[name];
100 let column = &self.columns[i];
101
102 let filtered_column = match column {
103 Column::Int64(col) => {
104 let filtered_data: Vec<i64> = indices
105 .iter()
106 .map(|&idx| {
107 if let Ok(Some(val)) = col.get(idx) {
108 val
109 } else {
110 0 // Default value
111 }
112 })
113 .collect();
114 Column::Int64(Int64Column::new(filtered_data))
115 }
116 Column::Float64(col) => {
117 let filtered_data: Vec<f64> = indices
118 .iter()
119 .map(|&idx| {
120 if let Ok(Some(val)) = col.get(idx) {
121 val
122 } else {
123 0.0 // Default value
124 }
125 })
126 .collect();
127 Column::Float64(Float64Column::new(filtered_data))
128 }
129 Column::String(col) => {
130 let filtered_data: Vec<String> = indices
131 .iter()
132 .map(|&idx| {
133 if let Ok(Some(val)) = col.get(idx) {
134 val.to_string()
135 } else {
136 String::new() // Default value
137 }
138 })
139 .collect();
140 Column::String(StringColumn::new(filtered_data))
141 }
142 Column::Boolean(col) => {
143 let filtered_data: Vec<bool> = indices
144 .iter()
145 .map(|&idx| {
146 if let Ok(Some(val)) = col.get(idx) {
147 val
148 } else {
149 false // Default value
150 }
151 })
152 .collect();
153 Column::Boolean(BooleanColumn::new(filtered_data))
154 }
155 };
156
157 result_columns.push((name.clone(), filtered_column));
158 }
159 } else {
160 // Parallel processing for large data
161 // Process each column in parallel (coarse-grained parallelism at column level)
162 result_columns = self
163 .column_names
164 .par_iter()
165 .map(|name| {
166 let i = self.column_indices[name];
167 let column = &self.columns[i];
168
169 let indices_len = indices.len();
170 let filtered_column = match column {
171 Column::Int64(col) => {
172 // Split large index list for processing
173 let chunk_size = (indices_len / 8).max(1000);
174 let mut filtered_data = Vec::with_capacity(indices_len);
175
176 // Use chunks to ensure all elements are processed
177 for chunk in indices.chunks(chunk_size) {
178 let chunk_data: Vec<i64> = chunk
179 .iter()
180 .map(|&idx| {
181 if let Ok(Some(val)) = col.get(idx) {
182 val
183 } else {
184 0 // Default value
185 }
186 })
187 .collect();
188 filtered_data.extend(chunk_data);
189 }
190
191 Column::Int64(Int64Column::new(filtered_data))
192 }
193 Column::Float64(col) => {
194 // Split large index list for processing
195 let chunk_size = (indices_len / 8).max(1000);
196 let mut filtered_data = Vec::with_capacity(indices_len);
197
198 // Use chunks to ensure all elements are processed
199 for chunk in indices.chunks(chunk_size) {
200 let chunk_data: Vec<f64> = chunk
201 .iter()
202 .map(|&idx| {
203 if let Ok(Some(val)) = col.get(idx) {
204 val
205 } else {
206 0.0 // Default value
207 }
208 })
209 .collect();
210 filtered_data.extend(chunk_data);
211 }
212
213 Column::Float64(Float64Column::new(filtered_data))
214 }
215 Column::String(col) => {
216 // String processing is especially heavy, use finer chunks
217 let chunk_size = (indices_len / 16).max(500);
218 let mut filtered_data = Vec::with_capacity(indices_len);
219
220 // Use chunks to ensure all elements are processed
221 for chunk in indices.chunks(chunk_size) {
222 let chunk_data: Vec<String> = chunk
223 .iter()
224 .map(|&idx| {
225 if let Ok(Some(val)) = col.get(idx) {
226 val.to_string()
227 } else {
228 String::new() // Default value
229 }
230 })
231 .collect();
232 filtered_data.extend(chunk_data);
233 }
234
235 Column::String(StringColumn::new(filtered_data))
236 }
237 Column::Boolean(col) => {
238 let filtered_data: Vec<bool> = indices
239 .iter()
240 .map(|&idx| {
241 if let Ok(Some(val)) = col.get(idx) {
242 val
243 } else {
244 false // Default value
245 }
246 })
247 .collect();
248 Column::Boolean(BooleanColumn::new(filtered_data))
249 }
250 };
251
252 (name.clone(), filtered_column)
253 })
254 .collect();
255 }
256
257 // Add results to DataFrame
258 for (name, column) in result_columns {
259 result.add_column(name, column)?;
260 }
261
262 // Copy index
263 if let Some(ref idx) = self.index {
264 result.index = Some(idx.clone());
265 }
266
267 Ok(result)
268 } else {
269 Err(Error::OperationFailed(format!(
270 "Column '{}' is not of boolean type",
271 condition_column
272 )))
273 }
274 }
275}