pub struct LazyFrame { /* private fields */ }
Expand description
DataFrame wrapper for lazy evaluation
Implementations§
Source§impl LazyFrame
impl LazyFrame
Sourcepub fn new(df: OptimizedDataFrame) -> Self
pub fn new(df: OptimizedDataFrame) -> Self
Create a new LazyFrame
Examples found in repository?
examples/optimized_stats_example.rs (line 182)
115fn optimized_regression_example() -> Result<()> {
116 println!("3. Example of Regression Analysis with Optimized Implementation");
117 println!("--------------------------");
118
119 // Create an optimized DataFrame
120 let mut opt_df = OptimizedDataFrame::new();
121
122 // Add explanatory variables as type-specialized columns
123 let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124 let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126 opt_df.add_column(
127 "x1".to_string(),
128 pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129 )?;
130 opt_df.add_column(
131 "x2".to_string(),
132 pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133 )?;
134
135 // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136 let mut y_values = Vec::with_capacity(10);
137 let mut rng = rng();
138
139 for i in 0..10 {
140 let noise = rng.random_range(-1.0..1.0);
141 let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142 y_values.push(y_val);
143 }
144
145 opt_df.add_column(
146 "y".to_string(),
147 pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148 )?;
149
150 // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151 let mut df = pandrs::DataFrame::new();
152
153 // Add x1, x2, y columns to the regular DataFrame
154 df.add_column(
155 "x1".to_string(),
156 pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157 )?;
158 df.add_column(
159 "x2".to_string(),
160 pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161 )?;
162 df.add_column(
163 "y".to_string(),
164 pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165 )?;
166
167 // Perform regression analysis
168 let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170 // Display results
171 println!(
172 "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173 model.intercept, model.coefficients[0], model.coefficients[1]
174 );
175 println!("R²: {:.4}", model.r_squared);
176 println!("Adjusted R²: {:.4}", model.adj_r_squared);
177 println!("p-values of regression coefficients: {:?}", model.p_values);
178
179 // Example using LazyFrame (leveraging lazy evaluation)
180 println!("\nOperations with LazyFrame:");
181
182 let lazy_df = LazyFrame::new(opt_df);
183
184 // Perform column selection and filtering by condition at once
185 // These operations are not executed until actually needed
186 let filtered = lazy_df
187 .select(&["x1", "x2", "y"])
188 // Filter condition for LazyFrame is specified as a simple string expression
189 .filter("x1 > 5.0")
190 .execute()?;
191
192 println!("Number of rows after filtering: {}", filtered.row_count());
193
194 // Example of simple regression
195 println!("\nSimple Regression Model (x1 only):");
196 let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197 println!(
198 "Linear Regression Model: y = {:.4} + {:.4} × x1",
199 model_simple.intercept, model_simple.coefficients[0]
200 );
201 println!("R²: {:.4}", model_simple.r_squared);
202
203 Ok(())
204}
More examples
examples/lazy_parallel_example.rs (line 120)
8fn main() -> Result<(), Box<dyn Error>> {
9 println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11 // Generate a large DataFrame
12 println!("Generating a large dataset...");
13 let rows = 100_000;
14 let df = generate_large_dataframe(rows)?;
15 println!("Generated {} rows of data", rows);
16
17 // Display a portion of the DataFrame
18 println!("\nFirst row of data:");
19 println!("{:?}\n", df);
20
21 // Filtering and aggregation with the standard approach
22 println!("Executing data processing with the standard approach...");
23 let start = Instant::now();
24
25 // Create an age filter (30 years and older)
26 let age_col = df.column("Age")?;
27 let mut age_filter = vec![false; df.row_count()];
28 if let Some(int_col) = age_col.as_int64() {
29 for i in 0..df.row_count() {
30 if let Ok(Some(age)) = int_col.get(i) {
31 age_filter[i] = age >= 30;
32 }
33 }
34 }
35
36 // Add the filter to the DataFrame
37 let bool_data = pandrs::BooleanColumn::new(age_filter);
38 let mut df_with_filter = df.clone();
39 df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41 // Execute filtering
42 let filtered_df = df_with_filter.filter("30 and older")?;
43
44 // Manually aggregate by department
45 let dept_col = filtered_df.column("Department")?;
46 let salary_col = filtered_df.column("Salary")?;
47
48 // Aggregation by department
49 let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50 std::collections::HashMap::new();
51
52 if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53 for i in 0..filtered_df.row_count() {
54 if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55 let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56 entry.0 += salary;
57 entry.1 += 1;
58 }
59 }
60 }
61
62 // Construct the result
63 let mut result_depts = Vec::new();
64 let mut result_totals = Vec::new();
65 let mut result_avgs = Vec::new();
66 let mut result_counts = Vec::new();
67
68 for (dept, (total, count)) in dept_totals {
69 result_depts.push(dept);
70 result_totals.push(total);
71 result_avgs.push(total / count as f64);
72 result_counts.push(count as f64);
73 }
74
75 // Create the result DataFrame
76 let mut result_df = OptimizedDataFrame::new();
77 result_df.add_column(
78 "Department",
79 Column::String(StringColumn::new(result_depts)),
80 )?;
81 result_df.add_column(
82 "Total Salary",
83 Column::Float64(Float64Column::new(result_totals)),
84 )?;
85 result_df.add_column(
86 "Average Salary",
87 Column::Float64(Float64Column::new(result_avgs)),
88 )?;
89 result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91 let standard_duration = start.elapsed();
92 println!(
93 "Processing time with the standard approach: {:?}",
94 standard_duration
95 );
96 println!("\nResults of the standard approach:");
97 println!("{:?}\n", result_df);
98
99 // Approach using LazyFrame and parallel processing
100 println!("Approach using LazyFrame and parallel processing...");
101 let start = Instant::now();
102
103 // First, create a boolean column for the filter
104 let age_col = df.column("Age")?;
105 let mut age_filter = vec![false; df.row_count()];
106 if let Some(int_col) = age_col.as_int64() {
107 for i in 0..df.row_count() {
108 if let Ok(Some(age)) = int_col.get(i) {
109 age_filter[i] = age >= 30;
110 }
111 }
112 }
113
114 // Add the filter to the DataFrame
115 let mut df_with_age_filter = df.clone();
116 let bool_data = pandrs::BooleanColumn::new(age_filter);
117 df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119 // Recreate the LazyFrame
120 let lazy_df = LazyFrame::new(df_with_age_filter);
121 let result_lazy = lazy_df
122 .filter("Age Filter") // Use the newly added boolean column for filtering
123 .aggregate(
124 vec!["Department".to_string()],
125 vec![
126 ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127 ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128 ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129 ]
130 );
131
132 // Display the execution plan
133 println!("\nExecution Plan:");
134 println!("{}", result_lazy.explain());
135
136 // Execute
137 let lazy_result = result_lazy.execute()?;
138
139 let lazy_duration = start.elapsed();
140 println!(
141 "Processing time with the LazyFrame approach: {:?}",
142 lazy_duration
143 );
144 println!("\nResults of the LazyFrame approach:");
145 println!("{:?}\n", lazy_result);
146
147 // Performance comparison
148 let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149 println!(
150 "The LazyFrame approach is {:.2} times faster than the standard approach",
151 speedup
152 );
153
154 Ok(())
155}
examples/parallel_benchmark.rs (line 121)
7fn main() {
8 println!("Parallel Processing Performance Benchmark");
9 println!("============================");
10
11 // Data size
12 const ROWS: usize = 1_000_000;
13
14 // ====================
15 // Create Large DataFrame
16 // ====================
17 println!("\n[1] Create Large DataFrame ({} rows)", ROWS);
18
19 // --- Data Generation ---
20 let data_gen_start = Instant::now();
21
22 let mut int_data = Vec::with_capacity(ROWS);
23 let mut float_data = Vec::with_capacity(ROWS);
24 let mut str_data = Vec::with_capacity(ROWS);
25 let mut bool_data = Vec::with_capacity(ROWS);
26
27 for i in 0..ROWS {
28 int_data.push(i as i64);
29 float_data.push(i as f64 / 100.0);
30 str_data.push(format!("value_{}", i % 1000)); // Limited string set
31 bool_data.push(i % 2 == 0);
32 }
33
34 let data_gen_time = data_gen_start.elapsed();
35 println!("Data generation time: {:?}", data_gen_time);
36
37 // --- DataFrame Construction ---
38 let df_construct_start = Instant::now();
39 let mut df = OptimizedDataFrame::new();
40
41 df.add_column("id".to_string(), Column::Int64(Int64Column::new(int_data)))
42 .unwrap();
43 df.add_column(
44 "value".to_string(),
45 Column::Float64(Float64Column::new(float_data)),
46 )
47 .unwrap();
48 df.add_column(
49 "category".to_string(),
50 Column::String(StringColumn::new(str_data)),
51 )
52 .unwrap();
53 df.add_column(
54 "flag".to_string(),
55 Column::Boolean(BooleanColumn::new(bool_data)),
56 )
57 .unwrap();
58
59 let df_construct_time = df_construct_start.elapsed();
60 println!("DataFrame construction time: {:?}", df_construct_time);
61 println!(
62 "Total DataFrame creation time: {:?}",
63 data_gen_time + df_construct_time
64 );
65
66 // ====================
67 // Serial vs Parallel Filtering
68 // ====================
69 println!("\n[2] Filtering Performance (id > 500000)");
70
71 // Add a condition column
72 let condition_data: Vec<bool> = (0..ROWS).map(|i| i > ROWS / 2).collect();
73 df.add_column(
74 "filter_condition".to_string(),
75 Column::Boolean(BooleanColumn::new(condition_data)),
76 )
77 .unwrap();
78
79 // Serial filtering
80 let mut serial_total = std::time::Duration::new(0, 0);
81 for _ in 0..3 {
82 let start = Instant::now();
83 let filtered_df = df.filter("filter_condition").unwrap();
84 let duration = start.elapsed();
85 serial_total += duration;
86 println!("Serial filtering time (1 run): {:?}", duration);
87 println!("Filtered row count: {}", filtered_df.row_count());
88 }
89 let serial_time = serial_total / 3;
90 println!("Average serial filtering time: {:?}", serial_time);
91
92 // Parallel filtering
93 let mut parallel_total = std::time::Duration::new(0, 0);
94 for _ in 0..3 {
95 let start = Instant::now();
96 let par_filtered_df = df.par_filter("filter_condition").unwrap();
97 let duration = start.elapsed();
98 parallel_total += duration;
99 println!("Parallel filtering time (1 run): {:?}", duration);
100 println!("Filtered row count: {}", par_filtered_df.row_count());
101 }
102 let parallel_time = parallel_total / 3;
103 println!("Average parallel filtering time: {:?}", parallel_time);
104
105 println!(
106 "Speedup: {:.2}x",
107 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
108 );
109
110 // ====================
111 // Grouping and Aggregation
112 // ====================
113 println!("\n[3] Grouping and Aggregation (average by category)");
114
115 // Serial grouping and aggregation
116 let small_df = df.select(&["category", "value"]).unwrap();
117
118 let mut serial_total = std::time::Duration::new(0, 0);
119 for _ in 0..3 {
120 let start = Instant::now();
121 let lazy_df = LazyFrame::new(small_df.clone());
122 let grouped_df = lazy_df
123 .aggregate(
124 vec!["category".to_string()],
125 vec![(
126 "value".to_string(),
127 AggregateOp::Mean,
128 "value_mean".to_string(),
129 )],
130 )
131 .execute()
132 .unwrap();
133 let duration = start.elapsed();
134 serial_total += duration;
135 println!(
136 "Serial grouping and aggregation time (1 run): {:?}",
137 duration
138 );
139 println!("Group count: {}", grouped_df.row_count());
140 }
141 let serial_time = serial_total / 3;
142 println!(
143 "Average serial grouping and aggregation time: {:?}",
144 serial_time
145 );
146
147 // Parallel grouping and aggregation
148 let mut parallel_total = std::time::Duration::new(0, 0);
149 for _ in 0..3 {
150 let start = Instant::now();
151 let grouped_map = small_df.par_groupby(&["category"]).unwrap();
152
153 let mut result_df = OptimizedDataFrame::new();
154 let mut categories = Vec::with_capacity(grouped_map.len());
155 let mut means = Vec::with_capacity(grouped_map.len());
156
157 for (category, group_df) in &grouped_map {
158 categories.push(category.clone());
159
160 let value_col = group_df.column("value").unwrap();
161 if let Some(float_col) = value_col.as_float64() {
162 let mut sum = 0.0;
163 let mut count = 0;
164
165 for i in 0..float_col.len() {
166 if let Ok(Some(val)) = float_col.get(i) {
167 sum += val;
168 count += 1;
169 }
170 }
171
172 let mean = if count > 0 { sum / count as f64 } else { 0.0 };
173 means.push(mean);
174 }
175 }
176
177 result_df
178 .add_column(
179 "category".to_string(),
180 Column::String(StringColumn::new(categories)),
181 )
182 .unwrap();
183 result_df
184 .add_column(
185 "value_mean".to_string(),
186 Column::Float64(Float64Column::new(means)),
187 )
188 .unwrap();
189
190 let duration = start.elapsed();
191 parallel_total += duration;
192 println!(
193 "Parallel grouping and aggregation time (1 run): {:?}",
194 duration
195 );
196 println!("Group count: {}", result_df.row_count());
197 }
198 let parallel_time = parallel_total / 3;
199 println!(
200 "Average parallel grouping and aggregation time: {:?}",
201 parallel_time
202 );
203
204 println!(
205 "Speedup: {:.2}x",
206 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
207 );
208
209 // ====================
210 // Computation (double all values)
211 // ====================
212 println!("\n[4] Computation (double the values in the 'value' column)");
213
214 // Serial computation
215 let start = Instant::now();
216 let mut computed_df = OptimizedDataFrame::new();
217
218 for name in df.column_names() {
219 let col_view = df.column(name).unwrap();
220
221 let new_col = if name == "value" {
222 let float_col = col_view.as_float64().unwrap();
223 let mut doubled_values = Vec::with_capacity(float_col.len());
224
225 for i in 0..float_col.len() {
226 if let Ok(Some(val)) = float_col.get(i) {
227 doubled_values.push(val * 2.0);
228 } else {
229 doubled_values.push(0.0);
230 }
231 }
232
233 Column::Float64(Float64Column::new(doubled_values))
234 } else {
235 col_view.into_column()
236 };
237
238 computed_df.add_column(name.to_string(), new_col).unwrap();
239 }
240
241 let serial_time = start.elapsed();
242 println!("Serial computation time: {:?}", serial_time);
243
244 // Parallel computation
245 let start = Instant::now();
246 let _par_computed_df = df
247 .par_apply(|view| {
248 if view.as_float64().is_some() {
249 if let Some(float_col) = view.as_float64() {
250 use rayon::prelude::*;
251
252 let values = (0..float_col.len())
253 .into_par_iter()
254 .map(|i| {
255 if let Ok(Some(val)) = float_col.get(i) {
256 val * 2.0
257 } else {
258 0.0
259 }
260 })
261 .collect::<Vec<_>>();
262
263 Ok(Column::Float64(Float64Column::new(values)))
264 } else {
265 Ok(view.clone().into_column())
266 }
267 } else {
268 Ok(view.clone().into_column())
269 }
270 })
271 .unwrap();
272
273 let parallel_time = start.elapsed();
274 println!("Parallel computation time: {:?}", parallel_time);
275
276 println!(
277 "Speedup: {:.2}x",
278 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
279 );
280
281 println!("\nParallel Benchmark Complete");
282}
Sourcepub fn select(self, columns: &[&str]) -> Self
pub fn select(self, columns: &[&str]) -> Self
Select columns
Examples found in repository?
examples/optimized_stats_example.rs (line 187)
115fn optimized_regression_example() -> Result<()> {
116 println!("3. Example of Regression Analysis with Optimized Implementation");
117 println!("--------------------------");
118
119 // Create an optimized DataFrame
120 let mut opt_df = OptimizedDataFrame::new();
121
122 // Add explanatory variables as type-specialized columns
123 let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124 let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126 opt_df.add_column(
127 "x1".to_string(),
128 pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129 )?;
130 opt_df.add_column(
131 "x2".to_string(),
132 pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133 )?;
134
135 // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136 let mut y_values = Vec::with_capacity(10);
137 let mut rng = rng();
138
139 for i in 0..10 {
140 let noise = rng.random_range(-1.0..1.0);
141 let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142 y_values.push(y_val);
143 }
144
145 opt_df.add_column(
146 "y".to_string(),
147 pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148 )?;
149
150 // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151 let mut df = pandrs::DataFrame::new();
152
153 // Add x1, x2, y columns to the regular DataFrame
154 df.add_column(
155 "x1".to_string(),
156 pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157 )?;
158 df.add_column(
159 "x2".to_string(),
160 pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161 )?;
162 df.add_column(
163 "y".to_string(),
164 pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165 )?;
166
167 // Perform regression analysis
168 let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170 // Display results
171 println!(
172 "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173 model.intercept, model.coefficients[0], model.coefficients[1]
174 );
175 println!("R²: {:.4}", model.r_squared);
176 println!("Adjusted R²: {:.4}", model.adj_r_squared);
177 println!("p-values of regression coefficients: {:?}", model.p_values);
178
179 // Example using LazyFrame (leveraging lazy evaluation)
180 println!("\nOperations with LazyFrame:");
181
182 let lazy_df = LazyFrame::new(opt_df);
183
184 // Perform column selection and filtering by condition at once
185 // These operations are not executed until actually needed
186 let filtered = lazy_df
187 .select(&["x1", "x2", "y"])
188 // Filter condition for LazyFrame is specified as a simple string expression
189 .filter("x1 > 5.0")
190 .execute()?;
191
192 println!("Number of rows after filtering: {}", filtered.row_count());
193
194 // Example of simple regression
195 println!("\nSimple Regression Model (x1 only):");
196 let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197 println!(
198 "Linear Regression Model: y = {:.4} + {:.4} × x1",
199 model_simple.intercept, model_simple.coefficients[0]
200 );
201 println!("R²: {:.4}", model_simple.r_squared);
202
203 Ok(())
204}
Sourcepub fn filter(self, condition: &str) -> Self
pub fn filter(self, condition: &str) -> Self
Filter data
Examples found in repository?
examples/optimized_stats_example.rs (line 189)
115fn optimized_regression_example() -> Result<()> {
116 println!("3. Example of Regression Analysis with Optimized Implementation");
117 println!("--------------------------");
118
119 // Create an optimized DataFrame
120 let mut opt_df = OptimizedDataFrame::new();
121
122 // Add explanatory variables as type-specialized columns
123 let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124 let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126 opt_df.add_column(
127 "x1".to_string(),
128 pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129 )?;
130 opt_df.add_column(
131 "x2".to_string(),
132 pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133 )?;
134
135 // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136 let mut y_values = Vec::with_capacity(10);
137 let mut rng = rng();
138
139 for i in 0..10 {
140 let noise = rng.random_range(-1.0..1.0);
141 let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142 y_values.push(y_val);
143 }
144
145 opt_df.add_column(
146 "y".to_string(),
147 pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148 )?;
149
150 // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151 let mut df = pandrs::DataFrame::new();
152
153 // Add x1, x2, y columns to the regular DataFrame
154 df.add_column(
155 "x1".to_string(),
156 pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157 )?;
158 df.add_column(
159 "x2".to_string(),
160 pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161 )?;
162 df.add_column(
163 "y".to_string(),
164 pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165 )?;
166
167 // Perform regression analysis
168 let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170 // Display results
171 println!(
172 "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173 model.intercept, model.coefficients[0], model.coefficients[1]
174 );
175 println!("R²: {:.4}", model.r_squared);
176 println!("Adjusted R²: {:.4}", model.adj_r_squared);
177 println!("p-values of regression coefficients: {:?}", model.p_values);
178
179 // Example using LazyFrame (leveraging lazy evaluation)
180 println!("\nOperations with LazyFrame:");
181
182 let lazy_df = LazyFrame::new(opt_df);
183
184 // Perform column selection and filtering by condition at once
185 // These operations are not executed until actually needed
186 let filtered = lazy_df
187 .select(&["x1", "x2", "y"])
188 // Filter condition for LazyFrame is specified as a simple string expression
189 .filter("x1 > 5.0")
190 .execute()?;
191
192 println!("Number of rows after filtering: {}", filtered.row_count());
193
194 // Example of simple regression
195 println!("\nSimple Regression Model (x1 only):");
196 let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197 println!(
198 "Linear Regression Model: y = {:.4} + {:.4} × x1",
199 model_simple.intercept, model_simple.coefficients[0]
200 );
201 println!("R²: {:.4}", model_simple.r_squared);
202
203 Ok(())
204}
More examples
examples/lazy_parallel_example.rs (line 122)
8fn main() -> Result<(), Box<dyn Error>> {
9 println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11 // Generate a large DataFrame
12 println!("Generating a large dataset...");
13 let rows = 100_000;
14 let df = generate_large_dataframe(rows)?;
15 println!("Generated {} rows of data", rows);
16
17 // Display a portion of the DataFrame
18 println!("\nFirst row of data:");
19 println!("{:?}\n", df);
20
21 // Filtering and aggregation with the standard approach
22 println!("Executing data processing with the standard approach...");
23 let start = Instant::now();
24
25 // Create an age filter (30 years and older)
26 let age_col = df.column("Age")?;
27 let mut age_filter = vec![false; df.row_count()];
28 if let Some(int_col) = age_col.as_int64() {
29 for i in 0..df.row_count() {
30 if let Ok(Some(age)) = int_col.get(i) {
31 age_filter[i] = age >= 30;
32 }
33 }
34 }
35
36 // Add the filter to the DataFrame
37 let bool_data = pandrs::BooleanColumn::new(age_filter);
38 let mut df_with_filter = df.clone();
39 df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41 // Execute filtering
42 let filtered_df = df_with_filter.filter("30 and older")?;
43
44 // Manually aggregate by department
45 let dept_col = filtered_df.column("Department")?;
46 let salary_col = filtered_df.column("Salary")?;
47
48 // Aggregation by department
49 let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50 std::collections::HashMap::new();
51
52 if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53 for i in 0..filtered_df.row_count() {
54 if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55 let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56 entry.0 += salary;
57 entry.1 += 1;
58 }
59 }
60 }
61
62 // Construct the result
63 let mut result_depts = Vec::new();
64 let mut result_totals = Vec::new();
65 let mut result_avgs = Vec::new();
66 let mut result_counts = Vec::new();
67
68 for (dept, (total, count)) in dept_totals {
69 result_depts.push(dept);
70 result_totals.push(total);
71 result_avgs.push(total / count as f64);
72 result_counts.push(count as f64);
73 }
74
75 // Create the result DataFrame
76 let mut result_df = OptimizedDataFrame::new();
77 result_df.add_column(
78 "Department",
79 Column::String(StringColumn::new(result_depts)),
80 )?;
81 result_df.add_column(
82 "Total Salary",
83 Column::Float64(Float64Column::new(result_totals)),
84 )?;
85 result_df.add_column(
86 "Average Salary",
87 Column::Float64(Float64Column::new(result_avgs)),
88 )?;
89 result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91 let standard_duration = start.elapsed();
92 println!(
93 "Processing time with the standard approach: {:?}",
94 standard_duration
95 );
96 println!("\nResults of the standard approach:");
97 println!("{:?}\n", result_df);
98
99 // Approach using LazyFrame and parallel processing
100 println!("Approach using LazyFrame and parallel processing...");
101 let start = Instant::now();
102
103 // First, create a boolean column for the filter
104 let age_col = df.column("Age")?;
105 let mut age_filter = vec![false; df.row_count()];
106 if let Some(int_col) = age_col.as_int64() {
107 for i in 0..df.row_count() {
108 if let Ok(Some(age)) = int_col.get(i) {
109 age_filter[i] = age >= 30;
110 }
111 }
112 }
113
114 // Add the filter to the DataFrame
115 let mut df_with_age_filter = df.clone();
116 let bool_data = pandrs::BooleanColumn::new(age_filter);
117 df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119 // Recreate the LazyFrame
120 let lazy_df = LazyFrame::new(df_with_age_filter);
121 let result_lazy = lazy_df
122 .filter("Age Filter") // Use the newly added boolean column for filtering
123 .aggregate(
124 vec!["Department".to_string()],
125 vec![
126 ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127 ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128 ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129 ]
130 );
131
132 // Display the execution plan
133 println!("\nExecution Plan:");
134 println!("{}", result_lazy.explain());
135
136 // Execute
137 let lazy_result = result_lazy.execute()?;
138
139 let lazy_duration = start.elapsed();
140 println!(
141 "Processing time with the LazyFrame approach: {:?}",
142 lazy_duration
143 );
144 println!("\nResults of the LazyFrame approach:");
145 println!("{:?}\n", lazy_result);
146
147 // Performance comparison
148 let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149 println!(
150 "The LazyFrame approach is {:.2} times faster than the standard approach",
151 speedup
152 );
153
154 Ok(())
155}
Sourcepub fn aggregate<I, J>(self, group_by: I, aggregations: J) -> Self
pub fn aggregate<I, J>(self, group_by: I, aggregations: J) -> Self
Perform aggregation
Examples found in repository?
examples/lazy_parallel_example.rs (lines 123-130)
8fn main() -> Result<(), Box<dyn Error>> {
9 println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11 // Generate a large DataFrame
12 println!("Generating a large dataset...");
13 let rows = 100_000;
14 let df = generate_large_dataframe(rows)?;
15 println!("Generated {} rows of data", rows);
16
17 // Display a portion of the DataFrame
18 println!("\nFirst row of data:");
19 println!("{:?}\n", df);
20
21 // Filtering and aggregation with the standard approach
22 println!("Executing data processing with the standard approach...");
23 let start = Instant::now();
24
25 // Create an age filter (30 years and older)
26 let age_col = df.column("Age")?;
27 let mut age_filter = vec![false; df.row_count()];
28 if let Some(int_col) = age_col.as_int64() {
29 for i in 0..df.row_count() {
30 if let Ok(Some(age)) = int_col.get(i) {
31 age_filter[i] = age >= 30;
32 }
33 }
34 }
35
36 // Add the filter to the DataFrame
37 let bool_data = pandrs::BooleanColumn::new(age_filter);
38 let mut df_with_filter = df.clone();
39 df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41 // Execute filtering
42 let filtered_df = df_with_filter.filter("30 and older")?;
43
44 // Manually aggregate by department
45 let dept_col = filtered_df.column("Department")?;
46 let salary_col = filtered_df.column("Salary")?;
47
48 // Aggregation by department
49 let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50 std::collections::HashMap::new();
51
52 if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53 for i in 0..filtered_df.row_count() {
54 if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55 let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56 entry.0 += salary;
57 entry.1 += 1;
58 }
59 }
60 }
61
62 // Construct the result
63 let mut result_depts = Vec::new();
64 let mut result_totals = Vec::new();
65 let mut result_avgs = Vec::new();
66 let mut result_counts = Vec::new();
67
68 for (dept, (total, count)) in dept_totals {
69 result_depts.push(dept);
70 result_totals.push(total);
71 result_avgs.push(total / count as f64);
72 result_counts.push(count as f64);
73 }
74
75 // Create the result DataFrame
76 let mut result_df = OptimizedDataFrame::new();
77 result_df.add_column(
78 "Department",
79 Column::String(StringColumn::new(result_depts)),
80 )?;
81 result_df.add_column(
82 "Total Salary",
83 Column::Float64(Float64Column::new(result_totals)),
84 )?;
85 result_df.add_column(
86 "Average Salary",
87 Column::Float64(Float64Column::new(result_avgs)),
88 )?;
89 result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91 let standard_duration = start.elapsed();
92 println!(
93 "Processing time with the standard approach: {:?}",
94 standard_duration
95 );
96 println!("\nResults of the standard approach:");
97 println!("{:?}\n", result_df);
98
99 // Approach using LazyFrame and parallel processing
100 println!("Approach using LazyFrame and parallel processing...");
101 let start = Instant::now();
102
103 // First, create a boolean column for the filter
104 let age_col = df.column("Age")?;
105 let mut age_filter = vec![false; df.row_count()];
106 if let Some(int_col) = age_col.as_int64() {
107 for i in 0..df.row_count() {
108 if let Ok(Some(age)) = int_col.get(i) {
109 age_filter[i] = age >= 30;
110 }
111 }
112 }
113
114 // Add the filter to the DataFrame
115 let mut df_with_age_filter = df.clone();
116 let bool_data = pandrs::BooleanColumn::new(age_filter);
117 df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119 // Recreate the LazyFrame
120 let lazy_df = LazyFrame::new(df_with_age_filter);
121 let result_lazy = lazy_df
122 .filter("Age Filter") // Use the newly added boolean column for filtering
123 .aggregate(
124 vec!["Department".to_string()],
125 vec![
126 ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127 ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128 ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129 ]
130 );
131
132 // Display the execution plan
133 println!("\nExecution Plan:");
134 println!("{}", result_lazy.explain());
135
136 // Execute
137 let lazy_result = result_lazy.execute()?;
138
139 let lazy_duration = start.elapsed();
140 println!(
141 "Processing time with the LazyFrame approach: {:?}",
142 lazy_duration
143 );
144 println!("\nResults of the LazyFrame approach:");
145 println!("{:?}\n", lazy_result);
146
147 // Performance comparison
148 let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149 println!(
150 "The LazyFrame approach is {:.2} times faster than the standard approach",
151 speedup
152 );
153
154 Ok(())
155}
More examples
examples/parallel_benchmark.rs (lines 123-130)
7fn main() {
8 println!("Parallel Processing Performance Benchmark");
9 println!("============================");
10
11 // Data size
12 const ROWS: usize = 1_000_000;
13
14 // ====================
15 // Create Large DataFrame
16 // ====================
17 println!("\n[1] Create Large DataFrame ({} rows)", ROWS);
18
19 // --- Data Generation ---
20 let data_gen_start = Instant::now();
21
22 let mut int_data = Vec::with_capacity(ROWS);
23 let mut float_data = Vec::with_capacity(ROWS);
24 let mut str_data = Vec::with_capacity(ROWS);
25 let mut bool_data = Vec::with_capacity(ROWS);
26
27 for i in 0..ROWS {
28 int_data.push(i as i64);
29 float_data.push(i as f64 / 100.0);
30 str_data.push(format!("value_{}", i % 1000)); // Limited string set
31 bool_data.push(i % 2 == 0);
32 }
33
34 let data_gen_time = data_gen_start.elapsed();
35 println!("Data generation time: {:?}", data_gen_time);
36
37 // --- DataFrame Construction ---
38 let df_construct_start = Instant::now();
39 let mut df = OptimizedDataFrame::new();
40
41 df.add_column("id".to_string(), Column::Int64(Int64Column::new(int_data)))
42 .unwrap();
43 df.add_column(
44 "value".to_string(),
45 Column::Float64(Float64Column::new(float_data)),
46 )
47 .unwrap();
48 df.add_column(
49 "category".to_string(),
50 Column::String(StringColumn::new(str_data)),
51 )
52 .unwrap();
53 df.add_column(
54 "flag".to_string(),
55 Column::Boolean(BooleanColumn::new(bool_data)),
56 )
57 .unwrap();
58
59 let df_construct_time = df_construct_start.elapsed();
60 println!("DataFrame construction time: {:?}", df_construct_time);
61 println!(
62 "Total DataFrame creation time: {:?}",
63 data_gen_time + df_construct_time
64 );
65
66 // ====================
67 // Serial vs Parallel Filtering
68 // ====================
69 println!("\n[2] Filtering Performance (id > 500000)");
70
71 // Add a condition column
72 let condition_data: Vec<bool> = (0..ROWS).map(|i| i > ROWS / 2).collect();
73 df.add_column(
74 "filter_condition".to_string(),
75 Column::Boolean(BooleanColumn::new(condition_data)),
76 )
77 .unwrap();
78
79 // Serial filtering
80 let mut serial_total = std::time::Duration::new(0, 0);
81 for _ in 0..3 {
82 let start = Instant::now();
83 let filtered_df = df.filter("filter_condition").unwrap();
84 let duration = start.elapsed();
85 serial_total += duration;
86 println!("Serial filtering time (1 run): {:?}", duration);
87 println!("Filtered row count: {}", filtered_df.row_count());
88 }
89 let serial_time = serial_total / 3;
90 println!("Average serial filtering time: {:?}", serial_time);
91
92 // Parallel filtering
93 let mut parallel_total = std::time::Duration::new(0, 0);
94 for _ in 0..3 {
95 let start = Instant::now();
96 let par_filtered_df = df.par_filter("filter_condition").unwrap();
97 let duration = start.elapsed();
98 parallel_total += duration;
99 println!("Parallel filtering time (1 run): {:?}", duration);
100 println!("Filtered row count: {}", par_filtered_df.row_count());
101 }
102 let parallel_time = parallel_total / 3;
103 println!("Average parallel filtering time: {:?}", parallel_time);
104
105 println!(
106 "Speedup: {:.2}x",
107 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
108 );
109
110 // ====================
111 // Grouping and Aggregation
112 // ====================
113 println!("\n[3] Grouping and Aggregation (average by category)");
114
115 // Serial grouping and aggregation
116 let small_df = df.select(&["category", "value"]).unwrap();
117
118 let mut serial_total = std::time::Duration::new(0, 0);
119 for _ in 0..3 {
120 let start = Instant::now();
121 let lazy_df = LazyFrame::new(small_df.clone());
122 let grouped_df = lazy_df
123 .aggregate(
124 vec!["category".to_string()],
125 vec![(
126 "value".to_string(),
127 AggregateOp::Mean,
128 "value_mean".to_string(),
129 )],
130 )
131 .execute()
132 .unwrap();
133 let duration = start.elapsed();
134 serial_total += duration;
135 println!(
136 "Serial grouping and aggregation time (1 run): {:?}",
137 duration
138 );
139 println!("Group count: {}", grouped_df.row_count());
140 }
141 let serial_time = serial_total / 3;
142 println!(
143 "Average serial grouping and aggregation time: {:?}",
144 serial_time
145 );
146
147 // Parallel grouping and aggregation
148 let mut parallel_total = std::time::Duration::new(0, 0);
149 for _ in 0..3 {
150 let start = Instant::now();
151 let grouped_map = small_df.par_groupby(&["category"]).unwrap();
152
153 let mut result_df = OptimizedDataFrame::new();
154 let mut categories = Vec::with_capacity(grouped_map.len());
155 let mut means = Vec::with_capacity(grouped_map.len());
156
157 for (category, group_df) in &grouped_map {
158 categories.push(category.clone());
159
160 let value_col = group_df.column("value").unwrap();
161 if let Some(float_col) = value_col.as_float64() {
162 let mut sum = 0.0;
163 let mut count = 0;
164
165 for i in 0..float_col.len() {
166 if let Ok(Some(val)) = float_col.get(i) {
167 sum += val;
168 count += 1;
169 }
170 }
171
172 let mean = if count > 0 { sum / count as f64 } else { 0.0 };
173 means.push(mean);
174 }
175 }
176
177 result_df
178 .add_column(
179 "category".to_string(),
180 Column::String(StringColumn::new(categories)),
181 )
182 .unwrap();
183 result_df
184 .add_column(
185 "value_mean".to_string(),
186 Column::Float64(Float64Column::new(means)),
187 )
188 .unwrap();
189
190 let duration = start.elapsed();
191 parallel_total += duration;
192 println!(
193 "Parallel grouping and aggregation time (1 run): {:?}",
194 duration
195 );
196 println!("Group count: {}", result_df.row_count());
197 }
198 let parallel_time = parallel_total / 3;
199 println!(
200 "Average parallel grouping and aggregation time: {:?}",
201 parallel_time
202 );
203
204 println!(
205 "Speedup: {:.2}x",
206 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
207 );
208
209 // ====================
210 // Computation (double all values)
211 // ====================
212 println!("\n[4] Computation (double the values in the 'value' column)");
213
214 // Serial computation
215 let start = Instant::now();
216 let mut computed_df = OptimizedDataFrame::new();
217
218 for name in df.column_names() {
219 let col_view = df.column(name).unwrap();
220
221 let new_col = if name == "value" {
222 let float_col = col_view.as_float64().unwrap();
223 let mut doubled_values = Vec::with_capacity(float_col.len());
224
225 for i in 0..float_col.len() {
226 if let Ok(Some(val)) = float_col.get(i) {
227 doubled_values.push(val * 2.0);
228 } else {
229 doubled_values.push(0.0);
230 }
231 }
232
233 Column::Float64(Float64Column::new(doubled_values))
234 } else {
235 col_view.into_column()
236 };
237
238 computed_df.add_column(name.to_string(), new_col).unwrap();
239 }
240
241 let serial_time = start.elapsed();
242 println!("Serial computation time: {:?}", serial_time);
243
244 // Parallel computation
245 let start = Instant::now();
246 let _par_computed_df = df
247 .par_apply(|view| {
248 if view.as_float64().is_some() {
249 if let Some(float_col) = view.as_float64() {
250 use rayon::prelude::*;
251
252 let values = (0..float_col.len())
253 .into_par_iter()
254 .map(|i| {
255 if let Ok(Some(val)) = float_col.get(i) {
256 val * 2.0
257 } else {
258 0.0
259 }
260 })
261 .collect::<Vec<_>>();
262
263 Ok(Column::Float64(Float64Column::new(values)))
264 } else {
265 Ok(view.clone().into_column())
266 }
267 } else {
268 Ok(view.clone().into_column())
269 }
270 })
271 .unwrap();
272
273 let parallel_time = start.elapsed();
274 println!("Parallel computation time: {:?}", parallel_time);
275
276 println!(
277 "Speedup: {:.2}x",
278 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
279 );
280
281 println!("\nParallel Benchmark Complete");
282}
Sourcepub fn join(
self,
right: OptimizedDataFrame,
left_on: &str,
right_on: &str,
join_type: JoinType,
) -> Self
pub fn join( self, right: OptimizedDataFrame, left_on: &str, right_on: &str, join_type: JoinType, ) -> Self
Join operation
Sourcepub fn execute(self) -> Result<OptimizedDataFrame>
pub fn execute(self) -> Result<OptimizedDataFrame>
Execute computation graph and get results
Examples found in repository?
examples/optimized_stats_example.rs (line 190)
115fn optimized_regression_example() -> Result<()> {
116 println!("3. Example of Regression Analysis with Optimized Implementation");
117 println!("--------------------------");
118
119 // Create an optimized DataFrame
120 let mut opt_df = OptimizedDataFrame::new();
121
122 // Add explanatory variables as type-specialized columns
123 let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124 let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126 opt_df.add_column(
127 "x1".to_string(),
128 pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129 )?;
130 opt_df.add_column(
131 "x2".to_string(),
132 pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133 )?;
134
135 // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136 let mut y_values = Vec::with_capacity(10);
137 let mut rng = rng();
138
139 for i in 0..10 {
140 let noise = rng.random_range(-1.0..1.0);
141 let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142 y_values.push(y_val);
143 }
144
145 opt_df.add_column(
146 "y".to_string(),
147 pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148 )?;
149
150 // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151 let mut df = pandrs::DataFrame::new();
152
153 // Add x1, x2, y columns to the regular DataFrame
154 df.add_column(
155 "x1".to_string(),
156 pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157 )?;
158 df.add_column(
159 "x2".to_string(),
160 pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161 )?;
162 df.add_column(
163 "y".to_string(),
164 pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165 )?;
166
167 // Perform regression analysis
168 let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170 // Display results
171 println!(
172 "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173 model.intercept, model.coefficients[0], model.coefficients[1]
174 );
175 println!("R²: {:.4}", model.r_squared);
176 println!("Adjusted R²: {:.4}", model.adj_r_squared);
177 println!("p-values of regression coefficients: {:?}", model.p_values);
178
179 // Example using LazyFrame (leveraging lazy evaluation)
180 println!("\nOperations with LazyFrame:");
181
182 let lazy_df = LazyFrame::new(opt_df);
183
184 // Perform column selection and filtering by condition at once
185 // These operations are not executed until actually needed
186 let filtered = lazy_df
187 .select(&["x1", "x2", "y"])
188 // Filter condition for LazyFrame is specified as a simple string expression
189 .filter("x1 > 5.0")
190 .execute()?;
191
192 println!("Number of rows after filtering: {}", filtered.row_count());
193
194 // Example of simple regression
195 println!("\nSimple Regression Model (x1 only):");
196 let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197 println!(
198 "Linear Regression Model: y = {:.4} + {:.4} × x1",
199 model_simple.intercept, model_simple.coefficients[0]
200 );
201 println!("R²: {:.4}", model_simple.r_squared);
202
203 Ok(())
204}
More examples
examples/lazy_parallel_example.rs (line 137)
8fn main() -> Result<(), Box<dyn Error>> {
9 println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11 // Generate a large DataFrame
12 println!("Generating a large dataset...");
13 let rows = 100_000;
14 let df = generate_large_dataframe(rows)?;
15 println!("Generated {} rows of data", rows);
16
17 // Display a portion of the DataFrame
18 println!("\nFirst row of data:");
19 println!("{:?}\n", df);
20
21 // Filtering and aggregation with the standard approach
22 println!("Executing data processing with the standard approach...");
23 let start = Instant::now();
24
25 // Create an age filter (30 years and older)
26 let age_col = df.column("Age")?;
27 let mut age_filter = vec![false; df.row_count()];
28 if let Some(int_col) = age_col.as_int64() {
29 for i in 0..df.row_count() {
30 if let Ok(Some(age)) = int_col.get(i) {
31 age_filter[i] = age >= 30;
32 }
33 }
34 }
35
36 // Add the filter to the DataFrame
37 let bool_data = pandrs::BooleanColumn::new(age_filter);
38 let mut df_with_filter = df.clone();
39 df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41 // Execute filtering
42 let filtered_df = df_with_filter.filter("30 and older")?;
43
44 // Manually aggregate by department
45 let dept_col = filtered_df.column("Department")?;
46 let salary_col = filtered_df.column("Salary")?;
47
48 // Aggregation by department
49 let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50 std::collections::HashMap::new();
51
52 if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53 for i in 0..filtered_df.row_count() {
54 if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55 let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56 entry.0 += salary;
57 entry.1 += 1;
58 }
59 }
60 }
61
62 // Construct the result
63 let mut result_depts = Vec::new();
64 let mut result_totals = Vec::new();
65 let mut result_avgs = Vec::new();
66 let mut result_counts = Vec::new();
67
68 for (dept, (total, count)) in dept_totals {
69 result_depts.push(dept);
70 result_totals.push(total);
71 result_avgs.push(total / count as f64);
72 result_counts.push(count as f64);
73 }
74
75 // Create the result DataFrame
76 let mut result_df = OptimizedDataFrame::new();
77 result_df.add_column(
78 "Department",
79 Column::String(StringColumn::new(result_depts)),
80 )?;
81 result_df.add_column(
82 "Total Salary",
83 Column::Float64(Float64Column::new(result_totals)),
84 )?;
85 result_df.add_column(
86 "Average Salary",
87 Column::Float64(Float64Column::new(result_avgs)),
88 )?;
89 result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91 let standard_duration = start.elapsed();
92 println!(
93 "Processing time with the standard approach: {:?}",
94 standard_duration
95 );
96 println!("\nResults of the standard approach:");
97 println!("{:?}\n", result_df);
98
99 // Approach using LazyFrame and parallel processing
100 println!("Approach using LazyFrame and parallel processing...");
101 let start = Instant::now();
102
103 // First, create a boolean column for the filter
104 let age_col = df.column("Age")?;
105 let mut age_filter = vec![false; df.row_count()];
106 if let Some(int_col) = age_col.as_int64() {
107 for i in 0..df.row_count() {
108 if let Ok(Some(age)) = int_col.get(i) {
109 age_filter[i] = age >= 30;
110 }
111 }
112 }
113
114 // Add the filter to the DataFrame
115 let mut df_with_age_filter = df.clone();
116 let bool_data = pandrs::BooleanColumn::new(age_filter);
117 df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119 // Recreate the LazyFrame
120 let lazy_df = LazyFrame::new(df_with_age_filter);
121 let result_lazy = lazy_df
122 .filter("Age Filter") // Use the newly added boolean column for filtering
123 .aggregate(
124 vec!["Department".to_string()],
125 vec![
126 ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127 ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128 ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129 ]
130 );
131
132 // Display the execution plan
133 println!("\nExecution Plan:");
134 println!("{}", result_lazy.explain());
135
136 // Execute
137 let lazy_result = result_lazy.execute()?;
138
139 let lazy_duration = start.elapsed();
140 println!(
141 "Processing time with the LazyFrame approach: {:?}",
142 lazy_duration
143 );
144 println!("\nResults of the LazyFrame approach:");
145 println!("{:?}\n", lazy_result);
146
147 // Performance comparison
148 let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149 println!(
150 "The LazyFrame approach is {:.2} times faster than the standard approach",
151 speedup
152 );
153
154 Ok(())
155}
examples/parallel_benchmark.rs (line 131)
7fn main() {
8 println!("Parallel Processing Performance Benchmark");
9 println!("============================");
10
11 // Data size
12 const ROWS: usize = 1_000_000;
13
14 // ====================
15 // Create Large DataFrame
16 // ====================
17 println!("\n[1] Create Large DataFrame ({} rows)", ROWS);
18
19 // --- Data Generation ---
20 let data_gen_start = Instant::now();
21
22 let mut int_data = Vec::with_capacity(ROWS);
23 let mut float_data = Vec::with_capacity(ROWS);
24 let mut str_data = Vec::with_capacity(ROWS);
25 let mut bool_data = Vec::with_capacity(ROWS);
26
27 for i in 0..ROWS {
28 int_data.push(i as i64);
29 float_data.push(i as f64 / 100.0);
30 str_data.push(format!("value_{}", i % 1000)); // Limited string set
31 bool_data.push(i % 2 == 0);
32 }
33
34 let data_gen_time = data_gen_start.elapsed();
35 println!("Data generation time: {:?}", data_gen_time);
36
37 // --- DataFrame Construction ---
38 let df_construct_start = Instant::now();
39 let mut df = OptimizedDataFrame::new();
40
41 df.add_column("id".to_string(), Column::Int64(Int64Column::new(int_data)))
42 .unwrap();
43 df.add_column(
44 "value".to_string(),
45 Column::Float64(Float64Column::new(float_data)),
46 )
47 .unwrap();
48 df.add_column(
49 "category".to_string(),
50 Column::String(StringColumn::new(str_data)),
51 )
52 .unwrap();
53 df.add_column(
54 "flag".to_string(),
55 Column::Boolean(BooleanColumn::new(bool_data)),
56 )
57 .unwrap();
58
59 let df_construct_time = df_construct_start.elapsed();
60 println!("DataFrame construction time: {:?}", df_construct_time);
61 println!(
62 "Total DataFrame creation time: {:?}",
63 data_gen_time + df_construct_time
64 );
65
66 // ====================
67 // Serial vs Parallel Filtering
68 // ====================
69 println!("\n[2] Filtering Performance (id > 500000)");
70
71 // Add a condition column
72 let condition_data: Vec<bool> = (0..ROWS).map(|i| i > ROWS / 2).collect();
73 df.add_column(
74 "filter_condition".to_string(),
75 Column::Boolean(BooleanColumn::new(condition_data)),
76 )
77 .unwrap();
78
79 // Serial filtering
80 let mut serial_total = std::time::Duration::new(0, 0);
81 for _ in 0..3 {
82 let start = Instant::now();
83 let filtered_df = df.filter("filter_condition").unwrap();
84 let duration = start.elapsed();
85 serial_total += duration;
86 println!("Serial filtering time (1 run): {:?}", duration);
87 println!("Filtered row count: {}", filtered_df.row_count());
88 }
89 let serial_time = serial_total / 3;
90 println!("Average serial filtering time: {:?}", serial_time);
91
92 // Parallel filtering
93 let mut parallel_total = std::time::Duration::new(0, 0);
94 for _ in 0..3 {
95 let start = Instant::now();
96 let par_filtered_df = df.par_filter("filter_condition").unwrap();
97 let duration = start.elapsed();
98 parallel_total += duration;
99 println!("Parallel filtering time (1 run): {:?}", duration);
100 println!("Filtered row count: {}", par_filtered_df.row_count());
101 }
102 let parallel_time = parallel_total / 3;
103 println!("Average parallel filtering time: {:?}", parallel_time);
104
105 println!(
106 "Speedup: {:.2}x",
107 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
108 );
109
110 // ====================
111 // Grouping and Aggregation
112 // ====================
113 println!("\n[3] Grouping and Aggregation (average by category)");
114
115 // Serial grouping and aggregation
116 let small_df = df.select(&["category", "value"]).unwrap();
117
118 let mut serial_total = std::time::Duration::new(0, 0);
119 for _ in 0..3 {
120 let start = Instant::now();
121 let lazy_df = LazyFrame::new(small_df.clone());
122 let grouped_df = lazy_df
123 .aggregate(
124 vec!["category".to_string()],
125 vec![(
126 "value".to_string(),
127 AggregateOp::Mean,
128 "value_mean".to_string(),
129 )],
130 )
131 .execute()
132 .unwrap();
133 let duration = start.elapsed();
134 serial_total += duration;
135 println!(
136 "Serial grouping and aggregation time (1 run): {:?}",
137 duration
138 );
139 println!("Group count: {}", grouped_df.row_count());
140 }
141 let serial_time = serial_total / 3;
142 println!(
143 "Average serial grouping and aggregation time: {:?}",
144 serial_time
145 );
146
147 // Parallel grouping and aggregation
148 let mut parallel_total = std::time::Duration::new(0, 0);
149 for _ in 0..3 {
150 let start = Instant::now();
151 let grouped_map = small_df.par_groupby(&["category"]).unwrap();
152
153 let mut result_df = OptimizedDataFrame::new();
154 let mut categories = Vec::with_capacity(grouped_map.len());
155 let mut means = Vec::with_capacity(grouped_map.len());
156
157 for (category, group_df) in &grouped_map {
158 categories.push(category.clone());
159
160 let value_col = group_df.column("value").unwrap();
161 if let Some(float_col) = value_col.as_float64() {
162 let mut sum = 0.0;
163 let mut count = 0;
164
165 for i in 0..float_col.len() {
166 if let Ok(Some(val)) = float_col.get(i) {
167 sum += val;
168 count += 1;
169 }
170 }
171
172 let mean = if count > 0 { sum / count as f64 } else { 0.0 };
173 means.push(mean);
174 }
175 }
176
177 result_df
178 .add_column(
179 "category".to_string(),
180 Column::String(StringColumn::new(categories)),
181 )
182 .unwrap();
183 result_df
184 .add_column(
185 "value_mean".to_string(),
186 Column::Float64(Float64Column::new(means)),
187 )
188 .unwrap();
189
190 let duration = start.elapsed();
191 parallel_total += duration;
192 println!(
193 "Parallel grouping and aggregation time (1 run): {:?}",
194 duration
195 );
196 println!("Group count: {}", result_df.row_count());
197 }
198 let parallel_time = parallel_total / 3;
199 println!(
200 "Average parallel grouping and aggregation time: {:?}",
201 parallel_time
202 );
203
204 println!(
205 "Speedup: {:.2}x",
206 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
207 );
208
209 // ====================
210 // Computation (double all values)
211 // ====================
212 println!("\n[4] Computation (double the values in the 'value' column)");
213
214 // Serial computation
215 let start = Instant::now();
216 let mut computed_df = OptimizedDataFrame::new();
217
218 for name in df.column_names() {
219 let col_view = df.column(name).unwrap();
220
221 let new_col = if name == "value" {
222 let float_col = col_view.as_float64().unwrap();
223 let mut doubled_values = Vec::with_capacity(float_col.len());
224
225 for i in 0..float_col.len() {
226 if let Ok(Some(val)) = float_col.get(i) {
227 doubled_values.push(val * 2.0);
228 } else {
229 doubled_values.push(0.0);
230 }
231 }
232
233 Column::Float64(Float64Column::new(doubled_values))
234 } else {
235 col_view.into_column()
236 };
237
238 computed_df.add_column(name.to_string(), new_col).unwrap();
239 }
240
241 let serial_time = start.elapsed();
242 println!("Serial computation time: {:?}", serial_time);
243
244 // Parallel computation
245 let start = Instant::now();
246 let _par_computed_df = df
247 .par_apply(|view| {
248 if view.as_float64().is_some() {
249 if let Some(float_col) = view.as_float64() {
250 use rayon::prelude::*;
251
252 let values = (0..float_col.len())
253 .into_par_iter()
254 .map(|i| {
255 if let Ok(Some(val)) = float_col.get(i) {
256 val * 2.0
257 } else {
258 0.0
259 }
260 })
261 .collect::<Vec<_>>();
262
263 Ok(Column::Float64(Float64Column::new(values)))
264 } else {
265 Ok(view.clone().into_column())
266 }
267 } else {
268 Ok(view.clone().into_column())
269 }
270 })
271 .unwrap();
272
273 let parallel_time = start.elapsed();
274 println!("Parallel computation time: {:?}", parallel_time);
275
276 println!(
277 "Speedup: {:.2}x",
278 serial_time.as_secs_f64() / parallel_time.as_secs_f64()
279 );
280
281 println!("\nParallel Benchmark Complete");
282}
Sourcepub fn explain(&self) -> String
pub fn explain(&self) -> String
Display optimized computation graph
Examples found in repository?
examples/lazy_parallel_example.rs (line 134)
8fn main() -> Result<(), Box<dyn Error>> {
9 println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11 // Generate a large DataFrame
12 println!("Generating a large dataset...");
13 let rows = 100_000;
14 let df = generate_large_dataframe(rows)?;
15 println!("Generated {} rows of data", rows);
16
17 // Display a portion of the DataFrame
18 println!("\nFirst row of data:");
19 println!("{:?}\n", df);
20
21 // Filtering and aggregation with the standard approach
22 println!("Executing data processing with the standard approach...");
23 let start = Instant::now();
24
25 // Create an age filter (30 years and older)
26 let age_col = df.column("Age")?;
27 let mut age_filter = vec![false; df.row_count()];
28 if let Some(int_col) = age_col.as_int64() {
29 for i in 0..df.row_count() {
30 if let Ok(Some(age)) = int_col.get(i) {
31 age_filter[i] = age >= 30;
32 }
33 }
34 }
35
36 // Add the filter to the DataFrame
37 let bool_data = pandrs::BooleanColumn::new(age_filter);
38 let mut df_with_filter = df.clone();
39 df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41 // Execute filtering
42 let filtered_df = df_with_filter.filter("30 and older")?;
43
44 // Manually aggregate by department
45 let dept_col = filtered_df.column("Department")?;
46 let salary_col = filtered_df.column("Salary")?;
47
48 // Aggregation by department
49 let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50 std::collections::HashMap::new();
51
52 if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53 for i in 0..filtered_df.row_count() {
54 if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55 let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56 entry.0 += salary;
57 entry.1 += 1;
58 }
59 }
60 }
61
62 // Construct the result
63 let mut result_depts = Vec::new();
64 let mut result_totals = Vec::new();
65 let mut result_avgs = Vec::new();
66 let mut result_counts = Vec::new();
67
68 for (dept, (total, count)) in dept_totals {
69 result_depts.push(dept);
70 result_totals.push(total);
71 result_avgs.push(total / count as f64);
72 result_counts.push(count as f64);
73 }
74
75 // Create the result DataFrame
76 let mut result_df = OptimizedDataFrame::new();
77 result_df.add_column(
78 "Department",
79 Column::String(StringColumn::new(result_depts)),
80 )?;
81 result_df.add_column(
82 "Total Salary",
83 Column::Float64(Float64Column::new(result_totals)),
84 )?;
85 result_df.add_column(
86 "Average Salary",
87 Column::Float64(Float64Column::new(result_avgs)),
88 )?;
89 result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91 let standard_duration = start.elapsed();
92 println!(
93 "Processing time with the standard approach: {:?}",
94 standard_duration
95 );
96 println!("\nResults of the standard approach:");
97 println!("{:?}\n", result_df);
98
99 // Approach using LazyFrame and parallel processing
100 println!("Approach using LazyFrame and parallel processing...");
101 let start = Instant::now();
102
103 // First, create a boolean column for the filter
104 let age_col = df.column("Age")?;
105 let mut age_filter = vec![false; df.row_count()];
106 if let Some(int_col) = age_col.as_int64() {
107 for i in 0..df.row_count() {
108 if let Ok(Some(age)) = int_col.get(i) {
109 age_filter[i] = age >= 30;
110 }
111 }
112 }
113
114 // Add the filter to the DataFrame
115 let mut df_with_age_filter = df.clone();
116 let bool_data = pandrs::BooleanColumn::new(age_filter);
117 df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119 // Recreate the LazyFrame
120 let lazy_df = LazyFrame::new(df_with_age_filter);
121 let result_lazy = lazy_df
122 .filter("Age Filter") // Use the newly added boolean column for filtering
123 .aggregate(
124 vec!["Department".to_string()],
125 vec![
126 ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127 ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128 ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129 ]
130 );
131
132 // Display the execution plan
133 println!("\nExecution Plan:");
134 println!("{}", result_lazy.explain());
135
136 // Execute
137 let lazy_result = result_lazy.execute()?;
138
139 let lazy_duration = start.elapsed();
140 println!(
141 "Processing time with the LazyFrame approach: {:?}",
142 lazy_duration
143 );
144 println!("\nResults of the LazyFrame approach:");
145 println!("{:?}\n", lazy_result);
146
147 // Performance comparison
148 let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149 println!(
150 "The LazyFrame approach is {:.2} times faster than the standard approach",
151 speedup
152 );
153
154 Ok(())
155}
Trait Implementations§
Auto Trait Implementations§
impl Freeze for LazyFrame
impl !RefUnwindSafe for LazyFrame
impl Send for LazyFrame
impl Sync for LazyFrame
impl Unpin for LazyFrame
impl !UnwindSafe for LazyFrame
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more