lazy_parallel_example/
lazy_parallel_example.rs1use std::error::Error;
2use std::time::Instant;
3
4use pandrs::{
5 AggregateOp, Column, Float64Column, Int64Column, LazyFrame, OptimizedDataFrame, StringColumn,
6};
7
8fn main() -> Result<(), Box<dyn Error>> {
9 println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11 println!("Generating a large dataset...");
13 let rows = 100_000;
14 let df = generate_large_dataframe(rows)?;
15 println!("Generated {} rows of data", rows);
16
17 println!("\nFirst row of data:");
19 println!("{:?}\n", df);
20
21 println!("Executing data processing with the standard approach...");
23 let start = Instant::now();
24
25 let age_col = df.column("Age")?;
27 let mut age_filter = vec![false; df.row_count()];
28 if let Some(int_col) = age_col.as_int64() {
29 for i in 0..df.row_count() {
30 if let Ok(Some(age)) = int_col.get(i) {
31 age_filter[i] = age >= 30;
32 }
33 }
34 }
35
36 let bool_data = pandrs::BooleanColumn::new(age_filter);
38 let mut df_with_filter = df.clone();
39 df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41 let filtered_df = df_with_filter.filter("30 and older")?;
43
44 let dept_col = filtered_df.column("Department")?;
46 let salary_col = filtered_df.column("Salary")?;
47
48 let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50 std::collections::HashMap::new();
51
52 if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53 for i in 0..filtered_df.row_count() {
54 if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55 let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56 entry.0 += salary;
57 entry.1 += 1;
58 }
59 }
60 }
61
62 let mut result_depts = Vec::new();
64 let mut result_totals = Vec::new();
65 let mut result_avgs = Vec::new();
66 let mut result_counts = Vec::new();
67
68 for (dept, (total, count)) in dept_totals {
69 result_depts.push(dept);
70 result_totals.push(total);
71 result_avgs.push(total / count as f64);
72 result_counts.push(count as f64);
73 }
74
75 let mut result_df = OptimizedDataFrame::new();
77 result_df.add_column(
78 "Department",
79 Column::String(StringColumn::new(result_depts)),
80 )?;
81 result_df.add_column(
82 "Total Salary",
83 Column::Float64(Float64Column::new(result_totals)),
84 )?;
85 result_df.add_column(
86 "Average Salary",
87 Column::Float64(Float64Column::new(result_avgs)),
88 )?;
89 result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91 let standard_duration = start.elapsed();
92 println!(
93 "Processing time with the standard approach: {:?}",
94 standard_duration
95 );
96 println!("\nResults of the standard approach:");
97 println!("{:?}\n", result_df);
98
99 println!("Approach using LazyFrame and parallel processing...");
101 let start = Instant::now();
102
103 let age_col = df.column("Age")?;
105 let mut age_filter = vec![false; df.row_count()];
106 if let Some(int_col) = age_col.as_int64() {
107 for i in 0..df.row_count() {
108 if let Ok(Some(age)) = int_col.get(i) {
109 age_filter[i] = age >= 30;
110 }
111 }
112 }
113
114 let mut df_with_age_filter = df.clone();
116 let bool_data = pandrs::BooleanColumn::new(age_filter);
117 df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119 let lazy_df = LazyFrame::new(df_with_age_filter);
121 let result_lazy = lazy_df
122 .filter("Age Filter") .aggregate(
124 vec!["Department".to_string()],
125 vec![
126 ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127 ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128 ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129 ]
130 );
131
132 println!("\nExecution Plan:");
134 println!("{}", result_lazy.explain());
135
136 let lazy_result = result_lazy.execute()?;
138
139 let lazy_duration = start.elapsed();
140 println!(
141 "Processing time with the LazyFrame approach: {:?}",
142 lazy_duration
143 );
144 println!("\nResults of the LazyFrame approach:");
145 println!("{:?}\n", lazy_result);
146
147 let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149 println!(
150 "The LazyFrame approach is {:.2} times faster than the standard approach",
151 speedup
152 );
153
154 Ok(())
155}
156
157fn generate_large_dataframe(rows: usize) -> Result<OptimizedDataFrame, Box<dyn Error>> {
159 use rand::rngs::StdRng;
160 use rand::{Rng, SeedableRng};
161
162 let mut rng = StdRng::seed_from_u64(42); let mut ids = Vec::with_capacity(rows);
166 let mut ages = Vec::with_capacity(rows);
167 let mut depts = Vec::with_capacity(rows);
168 let mut salaries = Vec::with_capacity(rows);
169
170 let departments = vec![
172 "Sales".to_string(),
173 "Development".to_string(),
174 "HR".to_string(),
175 "Finance".to_string(),
176 "Marketing".to_string(),
177 ];
178
179 for i in 0..rows {
180 ids.push(i as i64 + 1000); ages.push(rng.random_range(20..60)); depts.push(departments[rng.random_range(0..departments.len())].clone()); let base_salary = match depts.last().unwrap().as_str() {
186 "Sales" => 350_000.0,
187 "Development" => 400_000.0,
188 "HR" => 320_000.0,
189 "Finance" => 380_000.0,
190 "Marketing" => 360_000.0,
191 _ => 300_000.0,
192 };
193
194 let age_factor = *ages.last().unwrap() as f64 / 30.0;
195 let variation = rng.random_range(0.8..1.2);
196
197 salaries.push(base_salary * age_factor * variation);
198 }
199
200 let mut df = OptimizedDataFrame::new();
202 df.add_column("ID", Column::Int64(Int64Column::new(ids)))?;
203 df.add_column("Age", Column::Int64(Int64Column::new(ages)))?;
204 df.add_column("Department", Column::String(StringColumn::new(depts)))?;
205 df.add_column("Salary", Column::Float64(Float64Column::new(salaries)))?;
206
207 Ok(df)
208}